diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index f9c487e0..a9941a41 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -578,7 +578,7 @@ function update_thirdparty { function launch_tests() { tests_dir=${AIL_HOME}/tests bin_dir=${AIL_BIN} - python3 -m nose2 --start-dir $tests_dir --coverage $bin_dir --with-coverage testApi test_modules + python3 -m nose2 --start-dir $tests_dir --coverage $bin_dir --with-coverage test_api test_modules } function reset_password() { diff --git a/tests/testApi.py b/tests/testApi.py deleted file mode 100644 index c1070024..00000000 --- a/tests/testApi.py +++ /dev/null @@ -1,176 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import os -import sys -import time -import unittest - -sys.path.append(os.environ['AIL_BIN']) -################################## -# Import Project packages -################################## -from lib import Tag -from packages import Import_helper - -sys.path.append(os.environ['AIL_FLASK']) -sys.path.append(os.path.join(os.environ['AIL_FLASK'], 'modules')) -from Flask_server import app - - -# def parse_response(obj, ail_response): -# res_json = ail_response.get_json() -# if 'status' in res_json: -# if res_json['status'] == 'error': -# return obj.fail('{}: {}: {}'.format(ail_response.status_code, res_json['status'], res_json['reason'])) -# return res_json -# -# -# def get_api_key(): -# api_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD') -# if os.path.isfile(api_file): -# with open(os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD'), 'r') as f: -# content = f.read() -# content = content.splitlines() -# apikey = content[-1] -# apikey = apikey.replace('API_Key=', '', 1) -# # manual tests -# else: -# apikey = sys.argv[1] -# return apikey -# -# -# APIKEY = get_api_key() -# -# -# class TestApiV1(unittest.TestCase): -# import_uuid = None -# item_id = None -# -# def setUp(self): -# self.app = app -# self.app.config['TESTING'] = True -# self.client = self.app.test_client() -# self.apikey = APIKEY -# self.item_content = "text to import" -# self.item_tags = ["infoleak:analyst-detection=\"private-key\""] -# self.expected_tags = ["infoleak:analyst-detection=\"private-key\"", 'infoleak:submission="manual"'] -# -# # POST /api/v1/import/item -# def test_0001_api_import_item(self): -# input_json = {"type": "text", "tags": self.item_tags, "text": self.item_content} -# req = self.client.post('/api/v1/import/item', json=input_json, headers={'Authorization': self.apikey}) -# req_json = parse_response(self, req) -# import_uuid = req_json['uuid'] -# self.__class__.import_uuid = import_uuid -# self.assertTrue(Import_helper.is_valid_uuid_v4(import_uuid)) -# -# # POST /api/v1/get/import/item -# def test_0002_api_get_import_item(self): -# input_json = {"uuid": self.__class__.import_uuid} -# item_not_imported = True -# import_timout = 60 -# start = time.time() -# -# while item_not_imported: -# req = self.client.post('/api/v1/get/import/item', json=input_json, headers={'Authorization': self.apikey}) -# req_json = parse_response(self, req) -# if req_json['status'] == 'imported': -# try: -# item_id = req_json['items'][0] -# item_not_imported = False -# except Exception as e: -# if time.time() - start > import_timout: -# item_not_imported = False -# self.fail("Import error: {}".format(req_json)) -# else: -# if time.time() - start > import_timout: -# item_not_imported = False -# self.fail("Import Timeout, import status: {}".format(req_json['status'])) -# self.__class__.item_id = item_id -# -# # Process item -# time.sleep(5) -# -# # POST /api/v1/get/item/content -# def test_0003_api_get_item_content(self): -# input_json = {"id": self.__class__.item_id} -# req = self.client.post('/api/v1/get/item/content', json=input_json, headers={'Authorization': self.apikey}) -# req_json = parse_response(self, req) -# item_content = req_json['content'] -# self.assertEqual(item_content, self.item_content) -# -# # POST /api/v1/get/item/tag -# def test_0004_api_get_item_tag(self): -# input_json = {"id": self.__class__.item_id} -# req = self.client.post('/api/v1/get/item/tag', json=input_json, headers={'Authorization': self.apikey}) -# req_json = parse_response(self, req) -# item_tags = req_json['tags'] -# self.assertCountEqual(item_tags, self.expected_tags) -# -# # POST /api/v1/get/item/tag -# def test_0005_api_get_item_default(self): -# input_json = {"id": self.__class__.item_id} -# req = self.client.post('/api/v1/get/item/default', json=input_json, headers={'Authorization': self.apikey}) -# req_json = parse_response(self, req) -# item_tags = req_json['tags'] -# self.assertCountEqual(item_tags, self.expected_tags) -# item_content = req_json['content'] -# self.assertEqual(item_content, self.item_content) -# -# # POST /api/v1/get/item/tag -# # # TODO: add more test -# def test_0006_api_get_item(self): -# input_json = {"id": self.__class__.item_id, "content": True} -# req = self.client.post('/api/v1/get/item', json=input_json, headers={'Authorization': self.apikey}) -# req_json = parse_response(self, req) -# item_tags = req_json['tags'] -# self.assertCountEqual(item_tags, self.expected_tags) -# item_content = req_json['content'] -# self.assertEqual(item_content, self.item_content) -# -# # POST api/v1/add/item/tag -# def test_0007_api_add_item_tag(self): -# tags_to_add = ["infoleak:analyst-detection=\"api-key\""] -# current_item_tag = Tag.get_obj_tag(self.__class__.item_id) -# current_item_tag.append(tags_to_add[0]) -# -# # galaxy_to_add = ["misp-galaxy:stealer=\"Vidar\""] -# input_json = {"id": self.__class__.item_id, "tags": tags_to_add} -# req = self.client.post('/api/v1/add/item/tag', json=input_json, headers={'Authorization': self.apikey}) -# req_json = parse_response(self, req) -# item_tags = req_json['tags'] -# self.assertEqual(item_tags, tags_to_add) -# -# new_item_tag = Tag.get_obj_tag(self.__class__.item_id) -# self.assertCountEqual(new_item_tag, current_item_tag) -# -# # DELETE api/v1/delete/item/tag -# def test_0008_api_add_item_tag(self): -# tags_to_delete = ["infoleak:analyst-detection=\"api-key\""] -# input_json = {"id": self.__class__.item_id, "tags": tags_to_delete} -# req = self.client.delete('/api/v1/delete/item/tag', json=input_json, headers={'Authorization': self.apikey}) -# req_json = parse_response(self, req) -# item_tags = req_json['tags'] -# self.assertCountEqual(item_tags, tags_to_delete) -# current_item_tag = Tag.get_obj_tag(self.__class__.item_id) -# if tags_to_delete[0] in current_item_tag: -# self.fail('Tag no deleted') -# -# # POST api/v1/get/tag/metadata -# def test_0009_api_add_item_tag(self): -# input_json = {"tag": self.item_tags[0]} -# req = self.client.post('/api/v1/get/tag/metadata', json=input_json, headers={'Authorization': self.apikey}) -# req_json = parse_response(self, req) -# self.assertEqual(req_json['tag'], self.item_tags[0]) -# -# # GET api/v1/get/tag/all -# def test_0010_api_add_item_tag(self): -# input_json = {"tag": self.item_tags[0]} -# req = self.client.get('/api/v1/get/tag/all', json=input_json, headers={'Authorization': self.apikey}) -# req_json = parse_response(self, req) -# self.assertTrue(req_json['tags']) -# -# -if __name__ == "__main__": - unittest.main(argv=['first-arg-is-ignored'], exit=False) diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 00000000..dc1a6860 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import sys +import unittest + +from pyail import PyAIL + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from lib import Users + +sys.path.append(os.environ['AIL_FLASK']) +sys.path.append(os.path.join(os.environ['AIL_FLASK'], 'modules')) + +class TestApiV1(unittest.TestCase): + + def setUp(self): + # TODO GET HOST + PORT + self.ail = PyAIL('https://localhost:7000', Users.get_user_token('admin@admin.test'), ssl=False) + + # GET /api/v1/ping + def test_0001_api_ping(self): + r = self.ail.ping_ail() + self.assertEqual(r.get('status'), 'pong') + + # # GET /api/v1/uuid + # def test_0001_api_uuid(self): + # r = self.ail.get_uuid() + # + # # GET /api/v1/version + # def test_0001_api_version(self): + # r = self.ail.get_version() + + +if __name__ == "__main__": + unittest.main(exit=False) diff --git a/tests/test_modules.py b/tests/test_modules.py index 19fd04a6..0b427697 100644 --- a/tests/test_modules.py +++ b/tests/test_modules.py @@ -33,6 +33,8 @@ ITEMS_FOLDER = Items.ITEMS_FOLDER TESTS_ITEMS_FOLDER = os.path.join(ITEMS_FOLDER, 'tests') sample_dir = os.path.join(os.environ['AIL_HOME'], 'samples') copy_tree(sample_dir, TESTS_ITEMS_FOLDER) + + #### ---- #### class TestModuleApiKey(unittest.TestCase): @@ -53,6 +55,7 @@ class TestModuleApiKey(unittest.TestCase): self.assertCountEqual(matches[1], {aws_access_key}) self.assertCountEqual(matches[2], {aws_secret_key}) + class TestModuleCateg(unittest.TestCase): def setUp(self): @@ -65,9 +68,9 @@ class TestModuleCateg(unittest.TestCase): test_categ = ['CreditCards', 'Mail', 'Onion', 'Urls', 'Credential', 'Cve'] result = self.module.compute(None, r_result=True) - print(result) self.assertCountEqual(result, test_categ) + class TestModuleCreditCards(unittest.TestCase): def setUp(self): @@ -82,11 +85,12 @@ class TestModuleCreditCards(unittest.TestCase): '3547151714018657', # Japan Credit Bureau (JCB) '5492981206527330', # 16 digits MasterCard '4024007132849695', # '4532525919781' # 16-digit VISA, with separators - ] + ] result = self.module.compute('7', r_result=True) self.assertCountEqual(result, test_cards) + class TestModuleDomClassifier(unittest.TestCase): def setUp(self): @@ -100,6 +104,7 @@ class TestModuleDomClassifier(unittest.TestCase): result = self.module.compute(f'{test_host}', r_result=True) self.assertTrue(len(result)) + class TestModuleGlobal(unittest.TestCase): def setUp(self): @@ -119,21 +124,17 @@ class TestModuleGlobal(unittest.TestCase): self.module.obj = Items.Item(item_id) # Test new item result = self.module.compute(item_content_1, r_result=True) - print(f'test new item: {result}') self.assertEqual(result, item_id) # Test duplicate result = self.module.compute(item_content_1, r_result=True) - print(f'test duplicate {result}') self.assertIsNone(result) # Test same id with != content item = Items.Item('tests/2021/01/01/global_831875da824fc86ab5cc0e835755b520.gz') item.delete() result = self.module.compute(item_content_2, r_result=True) - print(f'test same id with != content: {result}') self.assertIn(item_id[:-3], result) - print(result) self.assertNotEqual(result, item_id) # cleanup @@ -141,6 +142,7 @@ class TestModuleGlobal(unittest.TestCase): # item.delete() # # TODO: remove from queue + class TestModuleKeys(unittest.TestCase): def setUp(self): @@ -151,7 +153,8 @@ class TestModuleKeys(unittest.TestCase): item_id = 'tests/2021/01/01/keys.gz' self.module.obj = Items.Item(item_id) # # TODO: check results - result = self.module.compute(None) + self.module.compute(None) + class TestModuleOnion(unittest.TestCase): @@ -162,11 +165,12 @@ class TestModuleOnion(unittest.TestCase): def test_module(self): item_id = 'tests/2021/01/01/onion.gz' self.module.obj = Items.Item(item_id) - domain_1 = 'eswpccgr5xyovsahffkehgleqthrasfpfdblwbs4lstd345dwq5qumqd.onion' - domain_2 = 'www.facebookcorewwwi.onion' + # domain_1 = 'eswpccgr5xyovsahffkehgleqthrasfpfdblwbs4lstd345dwq5qumqd.onion' + # domain_2 = 'www.facebookcorewwwi.onion' self.module.compute(f'3') + class TestModuleTelegram(unittest.TestCase): def setUp(self): @@ -177,7 +181,7 @@ class TestModuleTelegram(unittest.TestCase): item_id = 'tests/2021/01/01/keys.gz' self.module.obj = Items.Item(item_id) # # TODO: check results - result = self.module.compute(None) + self.module.compute(None) if __name__ == '__main__':