mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			177 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			177 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Python
		
	
	
#!/usr/bin/env python3
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
 | 
						|
sys.path.append(os.environ['AIL_BIN'])
 | 
						|
##################################
 | 
						|
# Import Project packages
 | 
						|
##################################
 | 
						|
from lib import Tag
 | 
						|
from packages import Import_helper
 | 
						|
 | 
						|
sys.path.append(os.environ['AIL_FLASK'])
 | 
						|
sys.path.append(os.path.join(os.environ['AIL_FLASK'], 'modules'))
 | 
						|
from Flask_server import app
 | 
						|
 | 
						|
 | 
						|
# def parse_response(obj, ail_response):
 | 
						|
#     res_json = ail_response.get_json()
 | 
						|
#     if 'status' in res_json:
 | 
						|
#         if res_json['status'] == 'error':
 | 
						|
#             return obj.fail('{}: {}: {}'.format(ail_response.status_code, res_json['status'], res_json['reason']))
 | 
						|
#     return res_json
 | 
						|
#
 | 
						|
#
 | 
						|
# def get_api_key():
 | 
						|
#     api_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD')
 | 
						|
#     if os.path.isfile(api_file):
 | 
						|
#         with open(os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD'), 'r') as f:
 | 
						|
#             content = f.read()
 | 
						|
#             content = content.splitlines()
 | 
						|
#             apikey = content[-1]
 | 
						|
#             apikey = apikey.replace('API_Key=', '', 1)
 | 
						|
#     # manual tests
 | 
						|
#     else:
 | 
						|
#         apikey = sys.argv[1]
 | 
						|
#     return apikey
 | 
						|
#
 | 
						|
#
 | 
						|
# APIKEY = get_api_key()
 | 
						|
#
 | 
						|
#
 | 
						|
# class TestApiV1(unittest.TestCase):
 | 
						|
#     import_uuid = None
 | 
						|
#     item_id = None
 | 
						|
#
 | 
						|
#     def setUp(self):
 | 
						|
#         self.app = app
 | 
						|
#         self.app.config['TESTING'] = True
 | 
						|
#         self.client = self.app.test_client()
 | 
						|
#         self.apikey = APIKEY
 | 
						|
#         self.item_content = "text to import"
 | 
						|
#         self.item_tags = ["infoleak:analyst-detection=\"private-key\""]
 | 
						|
#         self.expected_tags = ["infoleak:analyst-detection=\"private-key\"", 'infoleak:submission="manual"']
 | 
						|
#
 | 
						|
#     # POST /api/v1/import/item
 | 
						|
#     def test_0001_api_import_item(self):
 | 
						|
#         input_json = {"type": "text", "tags": self.item_tags, "text": self.item_content}
 | 
						|
#         req = self.client.post('/api/v1/import/item', json=input_json, headers={'Authorization': self.apikey})
 | 
						|
#         req_json = parse_response(self, req)
 | 
						|
#         import_uuid = req_json['uuid']
 | 
						|
#         self.__class__.import_uuid = import_uuid
 | 
						|
#         self.assertTrue(Import_helper.is_valid_uuid_v4(import_uuid))
 | 
						|
#
 | 
						|
#     # POST /api/v1/get/import/item
 | 
						|
#     def test_0002_api_get_import_item(self):
 | 
						|
#         input_json = {"uuid": self.__class__.import_uuid}
 | 
						|
#         item_not_imported = True
 | 
						|
#         import_timout = 60
 | 
						|
#         start = time.time()
 | 
						|
#
 | 
						|
#         while item_not_imported:
 | 
						|
#             req = self.client.post('/api/v1/get/import/item', json=input_json, headers={'Authorization': self.apikey})
 | 
						|
#             req_json = parse_response(self, req)
 | 
						|
#             if req_json['status'] == 'imported':
 | 
						|
#                 try:
 | 
						|
#                     item_id = req_json['items'][0]
 | 
						|
#                     item_not_imported = False
 | 
						|
#                 except Exception as e:
 | 
						|
#                     if time.time() - start > import_timout:
 | 
						|
#                         item_not_imported = False
 | 
						|
#                         self.fail("Import error: {}".format(req_json))
 | 
						|
#             else:
 | 
						|
#                 if time.time() - start > import_timout:
 | 
						|
#                     item_not_imported = False
 | 
						|
#                     self.fail("Import Timeout, import status: {}".format(req_json['status']))
 | 
						|
#         self.__class__.item_id = item_id
 | 
						|
#
 | 
						|
#         # Process item
 | 
						|
#         time.sleep(5)
 | 
						|
#
 | 
						|
#     # POST /api/v1/get/item/content
 | 
						|
#     def test_0003_api_get_item_content(self):
 | 
						|
#         input_json = {"id": self.__class__.item_id}
 | 
						|
#         req = self.client.post('/api/v1/get/item/content', json=input_json, headers={'Authorization': self.apikey})
 | 
						|
#         req_json = parse_response(self, req)
 | 
						|
#         item_content = req_json['content']
 | 
						|
#         self.assertEqual(item_content, self.item_content)
 | 
						|
#
 | 
						|
#     # POST /api/v1/get/item/tag
 | 
						|
#     def test_0004_api_get_item_tag(self):
 | 
						|
#         input_json = {"id": self.__class__.item_id}
 | 
						|
#         req = self.client.post('/api/v1/get/item/tag', json=input_json, headers={'Authorization': self.apikey})
 | 
						|
#         req_json = parse_response(self, req)
 | 
						|
#         item_tags = req_json['tags']
 | 
						|
#         self.assertCountEqual(item_tags, self.expected_tags)
 | 
						|
#
 | 
						|
#     # POST /api/v1/get/item/tag
 | 
						|
#     def test_0005_api_get_item_default(self):
 | 
						|
#         input_json = {"id": self.__class__.item_id}
 | 
						|
#         req = self.client.post('/api/v1/get/item/default', json=input_json, headers={'Authorization': self.apikey})
 | 
						|
#         req_json = parse_response(self, req)
 | 
						|
#         item_tags = req_json['tags']
 | 
						|
#         self.assertCountEqual(item_tags, self.expected_tags)
 | 
						|
#         item_content = req_json['content']
 | 
						|
#         self.assertEqual(item_content, self.item_content)
 | 
						|
#
 | 
						|
#     # POST /api/v1/get/item/tag
 | 
						|
#     # # TODO: add more test
 | 
						|
#     def test_0006_api_get_item(self):
 | 
						|
#         input_json = {"id": self.__class__.item_id, "content": True}
 | 
						|
#         req = self.client.post('/api/v1/get/item', json=input_json, headers={'Authorization': self.apikey})
 | 
						|
#         req_json = parse_response(self, req)
 | 
						|
#         item_tags = req_json['tags']
 | 
						|
#         self.assertCountEqual(item_tags, self.expected_tags)
 | 
						|
#         item_content = req_json['content']
 | 
						|
#         self.assertEqual(item_content, self.item_content)
 | 
						|
#
 | 
						|
#     # POST api/v1/add/item/tag
 | 
						|
#     def test_0007_api_add_item_tag(self):
 | 
						|
#         tags_to_add = ["infoleak:analyst-detection=\"api-key\""]
 | 
						|
#         current_item_tag = Tag.get_obj_tag(self.__class__.item_id)
 | 
						|
#         current_item_tag.append(tags_to_add[0])
 | 
						|
#
 | 
						|
#         # galaxy_to_add = ["misp-galaxy:stealer=\"Vidar\""]
 | 
						|
#         input_json = {"id": self.__class__.item_id, "tags": tags_to_add}
 | 
						|
#         req = self.client.post('/api/v1/add/item/tag', json=input_json, headers={'Authorization': self.apikey})
 | 
						|
#         req_json = parse_response(self, req)
 | 
						|
#         item_tags = req_json['tags']
 | 
						|
#         self.assertEqual(item_tags, tags_to_add)
 | 
						|
#
 | 
						|
#         new_item_tag = Tag.get_obj_tag(self.__class__.item_id)
 | 
						|
#         self.assertCountEqual(new_item_tag, current_item_tag)
 | 
						|
#
 | 
						|
#     # DELETE api/v1/delete/item/tag
 | 
						|
#     def test_0008_api_add_item_tag(self):
 | 
						|
#         tags_to_delete = ["infoleak:analyst-detection=\"api-key\""]
 | 
						|
#         input_json = {"id": self.__class__.item_id, "tags": tags_to_delete}
 | 
						|
#         req = self.client.delete('/api/v1/delete/item/tag', json=input_json, headers={'Authorization': self.apikey})
 | 
						|
#         req_json = parse_response(self, req)
 | 
						|
#         item_tags = req_json['tags']
 | 
						|
#         self.assertCountEqual(item_tags, tags_to_delete)
 | 
						|
#         current_item_tag = Tag.get_obj_tag(self.__class__.item_id)
 | 
						|
#         if tags_to_delete[0] in current_item_tag:
 | 
						|
#             self.fail('Tag no deleted')
 | 
						|
#
 | 
						|
#     # POST api/v1/get/tag/metadata
 | 
						|
#     def test_0009_api_add_item_tag(self):
 | 
						|
#         input_json = {"tag": self.item_tags[0]}
 | 
						|
#         req = self.client.post('/api/v1/get/tag/metadata', json=input_json, headers={'Authorization': self.apikey})
 | 
						|
#         req_json = parse_response(self, req)
 | 
						|
#         self.assertEqual(req_json['tag'], self.item_tags[0])
 | 
						|
#
 | 
						|
#     # GET api/v1/get/tag/all
 | 
						|
#     def test_0010_api_add_item_tag(self):
 | 
						|
#         input_json = {"tag": self.item_tags[0]}
 | 
						|
#         req = self.client.get('/api/v1/get/tag/all', json=input_json, headers={'Authorization': self.apikey})
 | 
						|
#         req_json = parse_response(self, req)
 | 
						|
#         self.assertTrue(req_json['tags'])
 | 
						|
#
 | 
						|
#
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main(argv=['first-arg-is-ignored'], exit=False)
 |