#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import sys import time import unittest sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## from lib import Tag from packages import Import_helper sys.path.append(os.environ['AIL_FLASK']) sys.path.append(os.path.join(os.environ['AIL_FLASK'], 'modules')) from Flask_server import app # def parse_response(obj, ail_response): # res_json = ail_response.get_json() # if 'status' in res_json: # if res_json['status'] == 'error': # return obj.fail('{}: {}: {}'.format(ail_response.status_code, res_json['status'], res_json['reason'])) # return res_json # # # def get_api_key(): # api_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD') # if os.path.isfile(api_file): # with open(os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD'), 'r') as f: # content = f.read() # content = content.splitlines() # apikey = content[-1] # apikey = apikey.replace('API_Key=', '', 1) # # manual tests # else: # apikey = sys.argv[1] # return apikey # # # APIKEY = get_api_key() # # # class TestApiV1(unittest.TestCase): # import_uuid = None # item_id = None # # def setUp(self): # self.app = app # self.app.config['TESTING'] = True # self.client = self.app.test_client() # self.apikey = APIKEY # self.item_content = "text to import" # self.item_tags = ["infoleak:analyst-detection=\"private-key\""] # self.expected_tags = ["infoleak:analyst-detection=\"private-key\"", 'infoleak:submission="manual"'] # # # POST /api/v1/import/item # def test_0001_api_import_item(self): # input_json = {"type": "text", "tags": self.item_tags, "text": self.item_content} # req = self.client.post('/api/v1/import/item', json=input_json, headers={'Authorization': self.apikey}) # req_json = parse_response(self, req) # import_uuid = req_json['uuid'] # self.__class__.import_uuid = import_uuid # self.assertTrue(Import_helper.is_valid_uuid_v4(import_uuid)) # # # POST /api/v1/get/import/item # def test_0002_api_get_import_item(self): # input_json = {"uuid": self.__class__.import_uuid} # item_not_imported = True # import_timout = 60 # start = time.time() # # while item_not_imported: # req = self.client.post('/api/v1/get/import/item', json=input_json, headers={'Authorization': self.apikey}) # req_json = parse_response(self, req) # if req_json['status'] == 'imported': # try: # item_id = req_json['items'][0] # item_not_imported = False # except Exception as e: # if time.time() - start > import_timout: # item_not_imported = False # self.fail("Import error: {}".format(req_json)) # else: # if time.time() - start > import_timout: # item_not_imported = False # self.fail("Import Timeout, import status: {}".format(req_json['status'])) # self.__class__.item_id = item_id # # # Process item # time.sleep(5) # # # POST /api/v1/get/item/content # def test_0003_api_get_item_content(self): # input_json = {"id": self.__class__.item_id} # req = self.client.post('/api/v1/get/item/content', json=input_json, headers={'Authorization': self.apikey}) # req_json = parse_response(self, req) # item_content = req_json['content'] # self.assertEqual(item_content, self.item_content) # # # POST /api/v1/get/item/tag # def test_0004_api_get_item_tag(self): # input_json = {"id": self.__class__.item_id} # req = self.client.post('/api/v1/get/item/tag', json=input_json, headers={'Authorization': self.apikey}) # req_json = parse_response(self, req) # item_tags = req_json['tags'] # self.assertCountEqual(item_tags, self.expected_tags) # # # POST /api/v1/get/item/tag # def test_0005_api_get_item_default(self): # input_json = {"id": self.__class__.item_id} # req = self.client.post('/api/v1/get/item/default', json=input_json, headers={'Authorization': self.apikey}) # req_json = parse_response(self, req) # item_tags = req_json['tags'] # self.assertCountEqual(item_tags, self.expected_tags) # item_content = req_json['content'] # self.assertEqual(item_content, self.item_content) # # # POST /api/v1/get/item/tag # # # TODO: add more test # def test_0006_api_get_item(self): # input_json = {"id": self.__class__.item_id, "content": True} # req = self.client.post('/api/v1/get/item', json=input_json, headers={'Authorization': self.apikey}) # req_json = parse_response(self, req) # item_tags = req_json['tags'] # self.assertCountEqual(item_tags, self.expected_tags) # item_content = req_json['content'] # self.assertEqual(item_content, self.item_content) # # # POST api/v1/add/item/tag # def test_0007_api_add_item_tag(self): # tags_to_add = ["infoleak:analyst-detection=\"api-key\""] # current_item_tag = Tag.get_obj_tag(self.__class__.item_id) # current_item_tag.append(tags_to_add[0]) # # # galaxy_to_add = ["misp-galaxy:stealer=\"Vidar\""] # input_json = {"id": self.__class__.item_id, "tags": tags_to_add} # req = self.client.post('/api/v1/add/item/tag', json=input_json, headers={'Authorization': self.apikey}) # req_json = parse_response(self, req) # item_tags = req_json['tags'] # self.assertEqual(item_tags, tags_to_add) # # new_item_tag = Tag.get_obj_tag(self.__class__.item_id) # self.assertCountEqual(new_item_tag, current_item_tag) # # # DELETE api/v1/delete/item/tag # def test_0008_api_add_item_tag(self): # tags_to_delete = ["infoleak:analyst-detection=\"api-key\""] # input_json = {"id": self.__class__.item_id, "tags": tags_to_delete} # req = self.client.delete('/api/v1/delete/item/tag', json=input_json, headers={'Authorization': self.apikey}) # req_json = parse_response(self, req) # item_tags = req_json['tags'] # self.assertCountEqual(item_tags, tags_to_delete) # current_item_tag = Tag.get_obj_tag(self.__class__.item_id) # if tags_to_delete[0] in current_item_tag: # self.fail('Tag no deleted') # # # POST api/v1/get/tag/metadata # def test_0009_api_add_item_tag(self): # input_json = {"tag": self.item_tags[0]} # req = self.client.post('/api/v1/get/tag/metadata', json=input_json, headers={'Authorization': self.apikey}) # req_json = parse_response(self, req) # self.assertEqual(req_json['tag'], self.item_tags[0]) # # # GET api/v1/get/tag/all # def test_0010_api_add_item_tag(self): # input_json = {"tag": self.item_tags[0]} # req = self.client.get('/api/v1/get/tag/all', json=input_json, headers={'Authorization': self.apikey}) # req_json = parse_response(self, req) # self.assertTrue(req_json['tags']) # # if __name__ == "__main__": unittest.main(argv=['first-arg-is-ignored'], exit=False)