From cda68b3f447a908c367a95593b9ba12683b23c8e Mon Sep 17 00:00:00 2001 From: netjinho <neto_moises95@hotmail.com> Date: Thu, 4 Oct 2018 19:03:24 +0200 Subject: [PATCH 1/3] Added some getters and setters for taxonomies, warninglists, noticelists and tags & documentation --- pymisp/api.py | 140 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 138 insertions(+), 2 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index eeba111..d605ea2 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -226,7 +226,7 @@ class PyMISP(object): try: json_response = response.json() except ValueError: - # It the server didn't return a JSON blob, we've a problem. + # If the server didn't return a JSON blob, we've a problem. raise PyMISPError(everything_broken.format(response.request.headers, response.request.body, response.text)) errors = [] @@ -1676,63 +1676,199 @@ class PyMISP(object): # ############## Tags ################## def get_tags_list(self): - """Get the list of existing tags""" + """Get the list of existing tags.""" url = urljoin(self.root_url, '/tags') response = self._prepare_request('GET', url) return self._check_response(response)['Tag'] + def get_tag(self, tag_id): + """Get a tag by id.""" + url = urljoin(self.root_url, '/tags/view/{}'.format(tag_id)) + response = self._prepare_request('GET', url) + return self._check_response(response) + + def _set_tag_parameters(self, name, colour, exportable, hide_tag, org_id, count, user_id, numerical_value, + attribute_count, old_tag): + tag = old_tag + if name is not None: + tag['name'] = name + if colour is not None: + tag['colour'] = colour + if exportable is not None: + tag['exportable'] = exportable + if hide_tag is not None: + tag['hide_tag'] = hide_tag + if org_id is not None: + tag['org_id'] = org_id + if count is not None: + tag['count'] = count + if user_id is not None: + tag['user_id'] = user_id + if numerical_value is not None: + tag['numerical_value'] = numerical_value + if attribute_count is not None: + tag['attribute_count'] = attribute_count + + return {'Tag': tag} + + def edit_tag(self, tag_id, name=None, colour=None, exportable=None, hide_tag=None, org_id=None, count=None, + user_id=None, numerical_value=None, attribute_count=None): + """Edit only the provided parameters of a tag.""" + old_tag = self.get_tag(tag_id) + new_tag = self._set_tag_parameters(name, colour, exportable, hide_tag, org_id, count, user_id, + numerical_value, attribute_count, old_tag) + url = urljoin(self.root_url, '/tags/edit/{}'.format(tag_id)) + response = self._prepare_request('POST', url, json.dumps(new_tag)) + return self._check_response(response) + + def edit_tag_json(self, json_file, tag_id): + """Edit the tag using a json file.""" + with open(json_file, 'rb') as f: + jdata = json.load(f) + url = urljoin(self.root_url, '/tags/edit/{}'.format(tag_id)) + response = self._prepare_request('POST', url, json.dumps(jdata)) + return self._check_response(response) + + def enable_tag(self, tag_id): + """Enable a tag by id.""" + response = self.edit_tag(tag_id, hide_tag=False) + return response + + def disable_tag(self, tag_id): + """Disable a tag by id.""" + response = self.edit_tag(tag_id, hide_tag=True) + return response + # ############## Taxonomies ################## def get_taxonomies_list(self): + """Get all the taxonomies.""" url = urljoin(self.root_url, '/taxonomies') response = self._prepare_request('GET', url) return self._check_response(response) def get_taxonomy(self, taxonomy_id): + """Get a taxonomy by id.""" url = urljoin(self.root_url, '/taxonomies/view/{}'.format(taxonomy_id)) response = self._prepare_request('GET', url) return self._check_response(response) def update_taxonomies(self): + """Update all the taxonomies.""" url = urljoin(self.root_url, '/taxonomies/update') response = self._prepare_request('POST', url) return self._check_response(response) + def enable_taxonomy(self, taxonomy_id): + """Enable a taxonomy by id.""" + url = urljoin(self.root_url, '/taxonomies/enable/{}'.format(taxonomy_id)) + response = self._prepare_request('POST', url) + return self._check_response(response) + + def disable_taxonomy(self, taxonomy_id): + """Disable a taxonomy by id.""" + url = urljoin(self.root_url, '/taxonomies/disable/{}'.format(taxonomy_id)) + response = self._prepare_request('POST', url) + return self._check_response(response) + + def get_taxonomy_tags_list(self, taxonomy_id): + """Get all the tags of a taxonomy by id.""" + url = urljoin(self.root_url, '/taxonomies/view/{}'.format(taxonomy_id)) + response = self._prepare_request('GET', url) + return self._check_response(response)["entries"] + + def enable_taxonomy_tags(self, taxonomy_id): + """Enable all the tags of a taxonomy by id.""" + url = urljoin(self.root_url, '/taxonomies/addTag/{}'.format(taxonomy_id)) + response = self._prepare_request('POST', url) + return self._check_response(response) + + def disable_taxonomy_tags(self, taxonomy_id): + """Disable all the tags of a taxonomy by id.""" + url = urljoin(self.root_url, '/taxonomies/disableTag/{}'.format(taxonomy_id)) + response = self._prepare_request('POST', url) + return self._check_response(response) + + # ############## WarningLists ################## def get_warninglists(self): + """Get all the warninglists.""" url = urljoin(self.root_url, '/warninglists') response = self._prepare_request('GET', url) return self._check_response(response) def get_warninglist(self, warninglist_id): + """Get a warninglist by id.""" url = urljoin(self.root_url, '/warninglists/view/{}'.format(warninglist_id)) response = self._prepare_request('GET', url) return self._check_response(response) def update_warninglists(self): + """Update all the warninglists.""" url = urljoin(self.root_url, '/warninglists/update') response = self._prepare_request('POST', url) return self._check_response(response) + def enable_warninglist(self, warninglist_id): + """Enable a warninglist by id.""" + url = urljoin(self.root_url, '/warninglists/enableWarninglist/{}/true'.format(warninglist_id)) + response = self._prepare_request('POST', url) + return self._check_response(response) + + def disable_warninglist(self, warninglist_id): + """Disable a warninglist by id.""" + url = urljoin(self.root_url, '/warninglists/enableWarninglist/{}'.format(warninglist_id)) + response = self._prepare_request('POST', url) + return self._check_response(response) + + # ############## NoticeLists ################## + + def get_noticelists(self): + """Get all the noticelists.""" + url = urljoin(self.root_url, '/noticelists') + response = self._prepare_request('GET', url) + return self._check_response(response) + + def get_noticelist(self, noticelist_id): + """Get a noticelist by id.""" + url = urljoin(self.root_url, '/noticelists/view/{}'.format(noticelist_id)) + response = self._prepare_request('GET', url) + def update_noticelists(self): + """Update all the noticelists.""" url = urljoin(self.root_url, '/noticelists/update') response = self._prepare_request('POST', url) return self._check_response(response) + def enable_noticelist(self, noticelist_id): + """Enable a noticelist by id.""" + url = urljoin(self.root_url, '/noticelists/enableNoticelist/{}/true'.format(noticelist_id)) + response = self._prepare_request('POST', url) + return self._check_response(response) + + def disable_noticelist(self, noticelist_id): + """Disable a noticelist by id.""" + url = urljoin(self.root_url, '/noticelists/enableNoticelist/{}'.format(noticelist_id)) + response = self._prepare_request('POST', url) + return self._check_response(response) + # ############## Galaxies/Clusters ################## def get_galaxies(self): + """Get all the galaxies.""" url = urljoin(self.root_url, '/galaxies') response = self._prepare_request('GET', url) return self._check_response(response) def get_galaxy(self, galaxy_id): + """Get a galaxy by id.""" url = urljoin(self.root_url, '/galaxies/view/{}'.format(galaxy_id)) response = self._prepare_request('GET', url) return self._check_response(response) def update_galaxies(self): + """Update all the galaxies.""" url = urljoin(self.root_url, '/galaxies/update') response = self._prepare_request('POST', url) return self._check_response(response) From 2fa56348e5f24ed753280ccc9eb0f9764f8cb1c8 Mon Sep 17 00:00:00 2001 From: netjinho <neto_moises95@hotmail.com> Date: Thu, 4 Oct 2018 19:31:46 +0200 Subject: [PATCH 2/3] Fixed leaked taxonomy tags problem --- pymisp/api.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index d605ea2..4b3df50 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -1767,6 +1767,7 @@ class PyMISP(object): def disable_taxonomy(self, taxonomy_id): """Disable a taxonomy by id.""" + self.disable_taxonomy_tags(taxonomy_id) url = urljoin(self.root_url, '/taxonomies/disable/{}'.format(taxonomy_id)) response = self._prepare_request('POST', url) return self._check_response(response) @@ -1779,9 +1780,11 @@ class PyMISP(object): def enable_taxonomy_tags(self, taxonomy_id): """Enable all the tags of a taxonomy by id.""" - url = urljoin(self.root_url, '/taxonomies/addTag/{}'.format(taxonomy_id)) - response = self._prepare_request('POST', url) - return self._check_response(response) + enabled = self.get_taxonomy(taxonomy_id)['Taxonomy']['enabled'] + if enabled: + url = urljoin(self.root_url, '/taxonomies/addTag/{}'.format(taxonomy_id)) + response = self._prepare_request('POST', url) + return self._check_response(response) def disable_taxonomy_tags(self, taxonomy_id): """Disable all the tags of a taxonomy by id.""" @@ -1834,6 +1837,7 @@ class PyMISP(object): """Get a noticelist by id.""" url = urljoin(self.root_url, '/noticelists/view/{}'.format(noticelist_id)) response = self._prepare_request('GET', url) + return self._check_response(response) def update_noticelists(self): """Update all the noticelists.""" From 9a2610a61f9bc682053c31211507675f14854de4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= <raphael@vinot.info> Date: Fri, 5 Oct 2018 17:45:12 +0200 Subject: [PATCH 3/3] chg: More test cases --- pymisp/api.py | 35 +++++++++--- pymisp/aping.py | 17 +----- tests/testlive_comprehensive.py | 99 +++++++++++++++++++++++++++++---- 3 files changed, 116 insertions(+), 35 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index 4b3df50..45d0023 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -1712,11 +1712,11 @@ class PyMISP(object): return {'Tag': tag} def edit_tag(self, tag_id, name=None, colour=None, exportable=None, hide_tag=None, org_id=None, count=None, - user_id=None, numerical_value=None, attribute_count=None): + user_id=None, numerical_value=None, attribute_count=None): """Edit only the provided parameters of a tag.""" old_tag = self.get_tag(tag_id) new_tag = self._set_tag_parameters(name, colour, exportable, hide_tag, org_id, count, user_id, - numerical_value, attribute_count, old_tag) + numerical_value, attribute_count, old_tag) url = urljoin(self.root_url, '/tags/edit/{}'.format(tag_id)) response = self._prepare_request('POST', url, json.dumps(new_tag)) return self._check_response(response) @@ -1792,7 +1792,6 @@ class PyMISP(object): response = self._prepare_request('POST', url) return self._check_response(response) - # ############## WarningLists ################## def get_warninglists(self): @@ -1813,17 +1812,35 @@ class PyMISP(object): response = self._prepare_request('POST', url) return self._check_response(response) + def toggle_warninglist(self, warninglist_id=None, warninglist_name=None, force_enable=None): + '''Toggle (enable/disable) the status of a warninglist by ID. + :param warninglist_id: ID of the WarningList + :param force_enable: Force the warning list in the enabled state (does nothing if already enabled) + ''' + if warninglist_id is None and warninglist_name is None: + raise Exception('Either warninglist_id or warninglist_name is required.') + query = {} + if warninglist_id is not None: + if not isinstance(warninglist_id, list): + warninglist_id = [warninglist_id] + query['id'] = warninglist_id + if warninglist_name is not None: + if not isinstance(warninglist_name, list): + warninglist_name = [warninglist_name] + query['name'] = warninglist_name + if force_enable is not None: + query['enabled'] = force_enable + url = urljoin(self.root_url, '/warninglists/toggleEnable') + response = self._prepare_request('POST', url, json.dumps(query)) + return self._check_response(response) + def enable_warninglist(self, warninglist_id): """Enable a warninglist by id.""" - url = urljoin(self.root_url, '/warninglists/enableWarninglist/{}/true'.format(warninglist_id)) - response = self._prepare_request('POST', url) - return self._check_response(response) + return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=True) def disable_warninglist(self, warninglist_id): """Disable a warninglist by id.""" - url = urljoin(self.root_url, '/warninglists/enableWarninglist/{}'.format(warninglist_id)) - response = self._prepare_request('POST', url) - return self._check_response(response) + return self.toggle_warninglist(warninglist_id=warninglist_id, force_enable=False) # ############## NoticeLists ################## diff --git a/pymisp/aping.py b/pymisp/aping.py index a8b2f47..2ced7d6 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -40,22 +40,7 @@ class ExpandedPyMISP(PyMISP): :param warninglist_id: ID of the WarningList :param force_enable: Force the warning list in the enabled state (does nothing is already enabled) ''' - if warninglist_id is None and warninglist_name is None: - raise Exception('Either warninglist_id or warninglist_name is required.') - query = {} - if warninglist_id is not None: - if not isinstance(warninglist_id, list): - warninglist_id = [warninglist_id] - query['id'] = warninglist_id - if warninglist_name is not None: - if not isinstance(warninglist_name, list): - warninglist_name = [warninglist_name] - query['name'] = warninglist_name - if force_enable is not None: - query['enabled'] = force_enable - url = urljoin(self.root_url, '/warninglists/toggleEnable') - response = self._prepare_request('POST', url, json.dumps(query)) - return self._check_response(response) + return super().toggle_warninglist(warninglist_id, warninglist_name, force_enable) def make_timestamp(self, value: DateTypes): if isinstance(value, datetime): diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index c4d5580..47eda79 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -750,27 +750,106 @@ class TestComprehensive(unittest.TestCase): self.admin_misp_connector.delete_event(third.id) def test_update_modules(self): - # warninglist - self.admin_misp_connector.update_warninglists() - r = self.admin_misp_connector.update_warninglists() - self.assertEqual(r['name'], 'All warninglists are up to date already.') - # taxonomies - self.admin_misp_connector.update_taxonomies() - r = self.admin_misp_connector.update_taxonomies() - self.assertEqual(r['name'], 'All taxonomy libraries are up to date already.') # object templates self.admin_misp_connector.update_object_templates() r = self.admin_misp_connector.update_object_templates() self.assertEqual(type(r), list) - # notice lists + + def test_tags(self): + # Get list + tags = self.admin_misp_connector.get_tags_list() + self.assertTrue(isinstance(tags, list)) + # Get tag + for tag in tags: + if not tag['hide_tag']: + break + tag = self.admin_misp_connector.get_tag(tags[0]['id']) + self.assertTrue('name' in tag) + self.admin_misp_connector.disable_tag(tag['id']) + # FIXME: returns the tag with ID 1 + self.admin_misp_connector.enable_tag(tag['id']) + # FIXME: returns the tag with ID 1 + + def test_taxonomies(self): + # Make sure we're up-to-date + self.admin_misp_connector.update_taxonomies() + r = self.admin_misp_connector.update_taxonomies() + self.assertEqual(r['name'], 'All taxonomy libraries are up to date already.') + # Get list + taxonomies = self.admin_misp_connector.get_taxonomies_list() + self.assertTrue(isinstance(taxonomies, list)) + list_name_test = 'tlp' + for tax in taxonomies: + if tax['Taxonomy']['namespace'] == list_name_test: + break + r = self.admin_misp_connector.get_taxonomy(tax['Taxonomy']['id']) + self.assertEqual(r['Taxonomy']['namespace'], list_name_test) + self.assertTrue('enabled' in r['Taxonomy']) + r = self.admin_misp_connector.enable_taxonomy(tax['Taxonomy']['id']) + self.assertEqual(r['message'], 'Taxonomy enabled') + r = self.admin_misp_connector.disable_taxonomy(tax['Taxonomy']['id']) + self.assertEqual(r['message'], 'Taxonomy disabled') + + def test_warninglists(self): + # Make sure we're up-to-date + self.admin_misp_connector.update_warninglists() + r = self.admin_misp_connector.update_warninglists() + self.assertEqual(r['name'], 'All warninglists are up to date already.') + # Get list + r = self.admin_misp_connector.get_warninglists() + # FIXME It returns Warninglists object instead of a list of warning lists directly. This is inconsistent. + warninglists = r['Warninglists'] + self.assertTrue(isinstance(warninglists, list)) + list_name_test = 'List of known hashes with common false-positives (based on Florian Roth input list)' + for wl in warninglists: + if wl['Warninglist']['name'] == list_name_test: + break + testwl = wl['Warninglist'] + r = self.admin_misp_connector.get_warninglist(testwl['id']) + self.assertEqual(r['Warninglist']['name'], list_name_test) + self.assertTrue('WarninglistEntry' in r['Warninglist']) + r = self.admin_misp_connector.enable_warninglist(testwl['id']) + self.assertEqual(r['success'], '1 warninglist(s) enabled') + r = self.admin_misp_connector.disable_warninglist(testwl['id']) + self.assertEqual(r['success'], '1 warninglist(s) disabled') + + def test_noticelists(self): + # Make sure we're up-to-date self.admin_misp_connector.update_noticelists() r = self.admin_misp_connector.update_noticelists() self.assertEqual(r['name'], 'All noticelists are up to date already.') + # Get list + noticelists = self.admin_misp_connector.get_noticelists() + self.assertTrue(isinstance(noticelists, list)) + list_name_test = 'gdpr' + for nl in noticelists: + if nl['Noticelist']['name'] == list_name_test: + break + testnl = nl + r = self.admin_misp_connector.get_noticelist(testnl['Noticelist']['id']) + self.assertEqual(r['Noticelist']['name'], list_name_test) + self.assertTrue('NoticelistEntry' in r['Noticelist']) + r = self.admin_misp_connector.enable_noticelist(testnl['Noticelist']['id']) + self.assertTrue(r['Noticelist']['enabled']) + r = self.admin_misp_connector.disable_noticelist(testnl['Noticelist']['id']) + self.assertFalse(r['Noticelist']['enabled']) + + def test_galaxies(self): if not travis_run: - # galaxies + # Make sure we're up-to-date self.admin_misp_connector.update_galaxies() r = self.admin_misp_connector.update_galaxies() self.assertEqual(r['name'], 'Galaxies updated.') + # Get list + galaxies = self.admin_misp_connector.get_galaxies() + self.assertTrue(isinstance(galaxies, list)) + list_name_test = 'Mobile Attack - Attack Pattern' + for galaxy in galaxies: + if galaxy['Galaxy']['name'] == list_name_test: + break + r = self.admin_misp_connector.get_galaxy(galaxy['Galaxy']['id']) + self.assertEqual(r['Galaxy']['name'], list_name_test) + self.assertTrue('GalaxyCluster' in r) @unittest.skip("Currently failing") def test_search_type_event_csv(self):