mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			chg: [test] add basic API test
							parent
							
								
									775b7fa868
								
							
						
					
					
						commit
						afe13185d9
					
				|  | @ -578,7 +578,7 @@ function update_thirdparty { | |||
| function launch_tests() { | ||||
|   tests_dir=${AIL_HOME}/tests | ||||
|   bin_dir=${AIL_BIN} | ||||
|   python3 -m nose2 --start-dir $tests_dir --coverage $bin_dir --with-coverage testApi test_modules | ||||
|   python3 -m nose2 --start-dir $tests_dir --coverage $bin_dir --with-coverage test_api test_modules | ||||
| } | ||||
| 
 | ||||
| function reset_password() { | ||||
|  |  | |||
							
								
								
									
										176
									
								
								tests/testApi.py
								
								
								
								
							
							
						
						
									
										176
									
								
								tests/testApi.py
								
								
								
								
							|  | @ -1,176 +0,0 @@ | |||
| #!/usr/bin/env python3 | ||||
| # -*- coding: utf-8 -*- | ||||
| 
 | ||||
| import os | ||||
| import sys | ||||
| import time | ||||
| import unittest | ||||
| 
 | ||||
| sys.path.append(os.environ['AIL_BIN']) | ||||
| ################################## | ||||
| # Import Project packages | ||||
| ################################## | ||||
| from lib import Tag | ||||
| from packages import Import_helper | ||||
| 
 | ||||
| sys.path.append(os.environ['AIL_FLASK']) | ||||
| sys.path.append(os.path.join(os.environ['AIL_FLASK'], 'modules')) | ||||
| from Flask_server import app | ||||
| 
 | ||||
| 
 | ||||
| # def parse_response(obj, ail_response): | ||||
| #     res_json = ail_response.get_json() | ||||
| #     if 'status' in res_json: | ||||
| #         if res_json['status'] == 'error': | ||||
| #             return obj.fail('{}: {}: {}'.format(ail_response.status_code, res_json['status'], res_json['reason'])) | ||||
| #     return res_json | ||||
| # | ||||
| # | ||||
| # def get_api_key(): | ||||
| #     api_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD') | ||||
| #     if os.path.isfile(api_file): | ||||
| #         with open(os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD'), 'r') as f: | ||||
| #             content = f.read() | ||||
| #             content = content.splitlines() | ||||
| #             apikey = content[-1] | ||||
| #             apikey = apikey.replace('API_Key=', '', 1) | ||||
| #     # manual tests | ||||
| #     else: | ||||
| #         apikey = sys.argv[1] | ||||
| #     return apikey | ||||
| # | ||||
| # | ||||
| # APIKEY = get_api_key() | ||||
| # | ||||
| # | ||||
| # class TestApiV1(unittest.TestCase): | ||||
| #     import_uuid = None | ||||
| #     item_id = None | ||||
| # | ||||
| #     def setUp(self): | ||||
| #         self.app = app | ||||
| #         self.app.config['TESTING'] = True | ||||
| #         self.client = self.app.test_client() | ||||
| #         self.apikey = APIKEY | ||||
| #         self.item_content = "text to import" | ||||
| #         self.item_tags = ["infoleak:analyst-detection=\"private-key\""] | ||||
| #         self.expected_tags = ["infoleak:analyst-detection=\"private-key\"", 'infoleak:submission="manual"'] | ||||
| # | ||||
| #     # POST /api/v1/import/item | ||||
| #     def test_0001_api_import_item(self): | ||||
| #         input_json = {"type": "text", "tags": self.item_tags, "text": self.item_content} | ||||
| #         req = self.client.post('/api/v1/import/item', json=input_json, headers={'Authorization': self.apikey}) | ||||
| #         req_json = parse_response(self, req) | ||||
| #         import_uuid = req_json['uuid'] | ||||
| #         self.__class__.import_uuid = import_uuid | ||||
| #         self.assertTrue(Import_helper.is_valid_uuid_v4(import_uuid)) | ||||
| # | ||||
| #     # POST /api/v1/get/import/item | ||||
| #     def test_0002_api_get_import_item(self): | ||||
| #         input_json = {"uuid": self.__class__.import_uuid} | ||||
| #         item_not_imported = True | ||||
| #         import_timout = 60 | ||||
| #         start = time.time() | ||||
| # | ||||
| #         while item_not_imported: | ||||
| #             req = self.client.post('/api/v1/get/import/item', json=input_json, headers={'Authorization': self.apikey}) | ||||
| #             req_json = parse_response(self, req) | ||||
| #             if req_json['status'] == 'imported': | ||||
| #                 try: | ||||
| #                     item_id = req_json['items'][0] | ||||
| #                     item_not_imported = False | ||||
| #                 except Exception as e: | ||||
| #                     if time.time() - start > import_timout: | ||||
| #                         item_not_imported = False | ||||
| #                         self.fail("Import error: {}".format(req_json)) | ||||
| #             else: | ||||
| #                 if time.time() - start > import_timout: | ||||
| #                     item_not_imported = False | ||||
| #                     self.fail("Import Timeout, import status: {}".format(req_json['status'])) | ||||
| #         self.__class__.item_id = item_id | ||||
| # | ||||
| #         # Process item | ||||
| #         time.sleep(5) | ||||
| # | ||||
| #     # POST /api/v1/get/item/content | ||||
| #     def test_0003_api_get_item_content(self): | ||||
| #         input_json = {"id": self.__class__.item_id} | ||||
| #         req = self.client.post('/api/v1/get/item/content', json=input_json, headers={'Authorization': self.apikey}) | ||||
| #         req_json = parse_response(self, req) | ||||
| #         item_content = req_json['content'] | ||||
| #         self.assertEqual(item_content, self.item_content) | ||||
| # | ||||
| #     # POST /api/v1/get/item/tag | ||||
| #     def test_0004_api_get_item_tag(self): | ||||
| #         input_json = {"id": self.__class__.item_id} | ||||
| #         req = self.client.post('/api/v1/get/item/tag', json=input_json, headers={'Authorization': self.apikey}) | ||||
| #         req_json = parse_response(self, req) | ||||
| #         item_tags = req_json['tags'] | ||||
| #         self.assertCountEqual(item_tags, self.expected_tags) | ||||
| # | ||||
| #     # POST /api/v1/get/item/tag | ||||
| #     def test_0005_api_get_item_default(self): | ||||
| #         input_json = {"id": self.__class__.item_id} | ||||
| #         req = self.client.post('/api/v1/get/item/default', json=input_json, headers={'Authorization': self.apikey}) | ||||
| #         req_json = parse_response(self, req) | ||||
| #         item_tags = req_json['tags'] | ||||
| #         self.assertCountEqual(item_tags, self.expected_tags) | ||||
| #         item_content = req_json['content'] | ||||
| #         self.assertEqual(item_content, self.item_content) | ||||
| # | ||||
| #     # POST /api/v1/get/item/tag | ||||
| #     # # TODO: add more test | ||||
| #     def test_0006_api_get_item(self): | ||||
| #         input_json = {"id": self.__class__.item_id, "content": True} | ||||
| #         req = self.client.post('/api/v1/get/item', json=input_json, headers={'Authorization': self.apikey}) | ||||
| #         req_json = parse_response(self, req) | ||||
| #         item_tags = req_json['tags'] | ||||
| #         self.assertCountEqual(item_tags, self.expected_tags) | ||||
| #         item_content = req_json['content'] | ||||
| #         self.assertEqual(item_content, self.item_content) | ||||
| # | ||||
| #     # POST api/v1/add/item/tag | ||||
| #     def test_0007_api_add_item_tag(self): | ||||
| #         tags_to_add = ["infoleak:analyst-detection=\"api-key\""] | ||||
| #         current_item_tag = Tag.get_obj_tag(self.__class__.item_id) | ||||
| #         current_item_tag.append(tags_to_add[0]) | ||||
| # | ||||
| #         # galaxy_to_add = ["misp-galaxy:stealer=\"Vidar\""] | ||||
| #         input_json = {"id": self.__class__.item_id, "tags": tags_to_add} | ||||
| #         req = self.client.post('/api/v1/add/item/tag', json=input_json, headers={'Authorization': self.apikey}) | ||||
| #         req_json = parse_response(self, req) | ||||
| #         item_tags = req_json['tags'] | ||||
| #         self.assertEqual(item_tags, tags_to_add) | ||||
| # | ||||
| #         new_item_tag = Tag.get_obj_tag(self.__class__.item_id) | ||||
| #         self.assertCountEqual(new_item_tag, current_item_tag) | ||||
| # | ||||
| #     # DELETE api/v1/delete/item/tag | ||||
| #     def test_0008_api_add_item_tag(self): | ||||
| #         tags_to_delete = ["infoleak:analyst-detection=\"api-key\""] | ||||
| #         input_json = {"id": self.__class__.item_id, "tags": tags_to_delete} | ||||
| #         req = self.client.delete('/api/v1/delete/item/tag', json=input_json, headers={'Authorization': self.apikey}) | ||||
| #         req_json = parse_response(self, req) | ||||
| #         item_tags = req_json['tags'] | ||||
| #         self.assertCountEqual(item_tags, tags_to_delete) | ||||
| #         current_item_tag = Tag.get_obj_tag(self.__class__.item_id) | ||||
| #         if tags_to_delete[0] in current_item_tag: | ||||
| #             self.fail('Tag no deleted') | ||||
| # | ||||
| #     # POST api/v1/get/tag/metadata | ||||
| #     def test_0009_api_add_item_tag(self): | ||||
| #         input_json = {"tag": self.item_tags[0]} | ||||
| #         req = self.client.post('/api/v1/get/tag/metadata', json=input_json, headers={'Authorization': self.apikey}) | ||||
| #         req_json = parse_response(self, req) | ||||
| #         self.assertEqual(req_json['tag'], self.item_tags[0]) | ||||
| # | ||||
| #     # GET api/v1/get/tag/all | ||||
| #     def test_0010_api_add_item_tag(self): | ||||
| #         input_json = {"tag": self.item_tags[0]} | ||||
| #         req = self.client.get('/api/v1/get/tag/all', json=input_json, headers={'Authorization': self.apikey}) | ||||
| #         req_json = parse_response(self, req) | ||||
| #         self.assertTrue(req_json['tags']) | ||||
| # | ||||
| # | ||||
| if __name__ == "__main__": | ||||
|     unittest.main(argv=['first-arg-is-ignored'], exit=False) | ||||
|  | @ -0,0 +1,40 @@ | |||
| #!/usr/bin/env python3 | ||||
| # -*- coding: utf-8 -*- | ||||
| 
 | ||||
| import os | ||||
| import sys | ||||
| import unittest | ||||
| 
 | ||||
| from pyail import PyAIL | ||||
| 
 | ||||
| sys.path.append(os.environ['AIL_BIN']) | ||||
| ################################## | ||||
| # Import Project packages | ||||
| ################################## | ||||
| from lib import Users | ||||
| 
 | ||||
| sys.path.append(os.environ['AIL_FLASK']) | ||||
| sys.path.append(os.path.join(os.environ['AIL_FLASK'], 'modules')) | ||||
| 
 | ||||
| class TestApiV1(unittest.TestCase): | ||||
| 
 | ||||
|     def setUp(self): | ||||
|         # TODO GET HOST + PORT | ||||
|         self.ail = PyAIL('https://localhost:7000', Users.get_user_token('admin@admin.test'), ssl=False) | ||||
| 
 | ||||
|     # GET /api/v1/ping | ||||
|     def test_0001_api_ping(self): | ||||
|         r = self.ail.ping_ail() | ||||
|         self.assertEqual(r.get('status'), 'pong') | ||||
| 
 | ||||
|     # # GET /api/v1/uuid | ||||
|     # def test_0001_api_uuid(self): | ||||
|     #     r = self.ail.get_uuid() | ||||
|     # | ||||
|     # # GET /api/v1/version | ||||
|     # def test_0001_api_version(self): | ||||
|     #     r = self.ail.get_version() | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     unittest.main(exit=False) | ||||
|  | @ -33,6 +33,8 @@ ITEMS_FOLDER = Items.ITEMS_FOLDER | |||
| TESTS_ITEMS_FOLDER = os.path.join(ITEMS_FOLDER, 'tests') | ||||
| sample_dir = os.path.join(os.environ['AIL_HOME'], 'samples') | ||||
| copy_tree(sample_dir, TESTS_ITEMS_FOLDER) | ||||
| 
 | ||||
| 
 | ||||
| #### ---- #### | ||||
| 
 | ||||
| class TestModuleApiKey(unittest.TestCase): | ||||
|  | @ -53,6 +55,7 @@ class TestModuleApiKey(unittest.TestCase): | |||
|         self.assertCountEqual(matches[1], {aws_access_key}) | ||||
|         self.assertCountEqual(matches[2], {aws_secret_key}) | ||||
| 
 | ||||
| 
 | ||||
| class TestModuleCateg(unittest.TestCase): | ||||
| 
 | ||||
|     def setUp(self): | ||||
|  | @ -65,9 +68,9 @@ class TestModuleCateg(unittest.TestCase): | |||
|         test_categ = ['CreditCards', 'Mail', 'Onion', 'Urls', 'Credential', 'Cve'] | ||||
| 
 | ||||
|         result = self.module.compute(None, r_result=True) | ||||
|         print(result) | ||||
|         self.assertCountEqual(result, test_categ) | ||||
| 
 | ||||
| 
 | ||||
| class TestModuleCreditCards(unittest.TestCase): | ||||
| 
 | ||||
|     def setUp(self): | ||||
|  | @ -82,11 +85,12 @@ class TestModuleCreditCards(unittest.TestCase): | |||
|                       '3547151714018657',  # Japan Credit Bureau (JCB) | ||||
|                       '5492981206527330',  # 16 digits MasterCard | ||||
|                       '4024007132849695',  # '4532525919781' # 16-digit VISA, with separators | ||||
|                      ] | ||||
|                       ] | ||||
| 
 | ||||
|         result = self.module.compute('7', r_result=True) | ||||
|         self.assertCountEqual(result, test_cards) | ||||
| 
 | ||||
| 
 | ||||
| class TestModuleDomClassifier(unittest.TestCase): | ||||
| 
 | ||||
|     def setUp(self): | ||||
|  | @ -100,6 +104,7 @@ class TestModuleDomClassifier(unittest.TestCase): | |||
|         result = self.module.compute(f'{test_host}', r_result=True) | ||||
|         self.assertTrue(len(result)) | ||||
| 
 | ||||
| 
 | ||||
| class TestModuleGlobal(unittest.TestCase): | ||||
| 
 | ||||
|     def setUp(self): | ||||
|  | @ -119,21 +124,17 @@ class TestModuleGlobal(unittest.TestCase): | |||
|         self.module.obj = Items.Item(item_id) | ||||
|         # Test new item | ||||
|         result = self.module.compute(item_content_1, r_result=True) | ||||
|         print(f'test new item: {result}') | ||||
|         self.assertEqual(result, item_id) | ||||
| 
 | ||||
|         # Test duplicate | ||||
|         result = self.module.compute(item_content_1, r_result=True) | ||||
|         print(f'test duplicate {result}') | ||||
|         self.assertIsNone(result) | ||||
| 
 | ||||
|         # Test same id with != content | ||||
|         item = Items.Item('tests/2021/01/01/global_831875da824fc86ab5cc0e835755b520.gz') | ||||
|         item.delete() | ||||
|         result = self.module.compute(item_content_2, r_result=True) | ||||
|         print(f'test same id with != content: {result}') | ||||
|         self.assertIn(item_id[:-3], result) | ||||
|         print(result) | ||||
|         self.assertNotEqual(result, item_id) | ||||
| 
 | ||||
|         # cleanup | ||||
|  | @ -141,6 +142,7 @@ class TestModuleGlobal(unittest.TestCase): | |||
|         # item.delete() | ||||
|         # # TODO: remove from queue | ||||
| 
 | ||||
| 
 | ||||
| class TestModuleKeys(unittest.TestCase): | ||||
| 
 | ||||
|     def setUp(self): | ||||
|  | @ -151,7 +153,8 @@ class TestModuleKeys(unittest.TestCase): | |||
|         item_id = 'tests/2021/01/01/keys.gz' | ||||
|         self.module.obj = Items.Item(item_id) | ||||
|         # # TODO: check results | ||||
|         result = self.module.compute(None) | ||||
|         self.module.compute(None) | ||||
| 
 | ||||
| 
 | ||||
| class TestModuleOnion(unittest.TestCase): | ||||
| 
 | ||||
|  | @ -162,11 +165,12 @@ class TestModuleOnion(unittest.TestCase): | |||
|     def test_module(self): | ||||
|         item_id = 'tests/2021/01/01/onion.gz' | ||||
|         self.module.obj = Items.Item(item_id) | ||||
|         domain_1 = 'eswpccgr5xyovsahffkehgleqthrasfpfdblwbs4lstd345dwq5qumqd.onion' | ||||
|         domain_2 = 'www.facebookcorewwwi.onion' | ||||
|         # domain_1 = 'eswpccgr5xyovsahffkehgleqthrasfpfdblwbs4lstd345dwq5qumqd.onion' | ||||
|         # domain_2 = 'www.facebookcorewwwi.onion' | ||||
| 
 | ||||
|         self.module.compute(f'3') | ||||
| 
 | ||||
| 
 | ||||
| class TestModuleTelegram(unittest.TestCase): | ||||
| 
 | ||||
|     def setUp(self): | ||||
|  | @ -177,7 +181,7 @@ class TestModuleTelegram(unittest.TestCase): | |||
|         item_id = 'tests/2021/01/01/keys.gz' | ||||
|         self.module.obj = Items.Item(item_id) | ||||
|         # # TODO: check results | ||||
|         result = self.module.compute(None) | ||||
|         self.module.compute(None) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 terrtia
						terrtia