mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			172 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			172 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
#!/usr/bin/env python3
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
 | 
						|
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
 | 
						|
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'bin'))
 | 
						|
sys.path.append(os.environ['AIL_FLASK'])
 | 
						|
sys.path.append(os.path.join(os.environ['AIL_FLASK'], 'modules'))
 | 
						|
 | 
						|
import Import_helper
 | 
						|
import Tag
 | 
						|
 | 
						|
from Flask_server import app
 | 
						|
 | 
						|
def parse_response(obj, ail_response):
 | 
						|
    res_json = ail_response.get_json()
 | 
						|
    if 'status' in res_json:
 | 
						|
        if res_json['status'] == 'error':
 | 
						|
            return obj.fail('{}: {}: {}'.format(ail_response.status_code, res_json['status'], res_json['reason']))
 | 
						|
    return res_json
 | 
						|
 | 
						|
def get_api_key():
 | 
						|
    api_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD')
 | 
						|
    if os.path.isfile(api_file):
 | 
						|
        with open(os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD'), 'r') as f:
 | 
						|
            content = f.read()
 | 
						|
            content = content.splitlines()
 | 
						|
            apikey = content[-1]
 | 
						|
            apikey = apikey.replace('API_Key=', '', 1)
 | 
						|
    # manual tests
 | 
						|
    else:
 | 
						|
        apikey = sys.argv[1]
 | 
						|
    return apikey
 | 
						|
 | 
						|
APIKEY = get_api_key()
 | 
						|
 | 
						|
class TestApiV1(unittest.TestCase):
 | 
						|
    import_uuid = None
 | 
						|
    item_id = None
 | 
						|
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.app = app
 | 
						|
        self.app.config['TESTING'] = True
 | 
						|
        self.client = self.app.test_client()
 | 
						|
        self.apikey = APIKEY
 | 
						|
        self.item_content = "text to import"
 | 
						|
        self.item_tags = ["infoleak:analyst-detection=\"private-key\""]
 | 
						|
        self.expected_tags = ["infoleak:analyst-detection=\"private-key\"", 'infoleak:submission="manual"']
 | 
						|
 | 
						|
    # POST /api/v1/import/item
 | 
						|
    def test_0001_api_import_item(self):
 | 
						|
        input_json = {"type": "text","tags": self.item_tags,"text": self.item_content}
 | 
						|
        req = self.client.post('/api/v1/import/item', json=input_json ,headers={ 'Authorization': self.apikey })
 | 
						|
        req_json = parse_response(self, req)
 | 
						|
        import_uuid = req_json['uuid']
 | 
						|
        self.__class__.import_uuid = import_uuid
 | 
						|
        self.assertTrue(Import_helper.is_valid_uuid_v4(import_uuid))
 | 
						|
 | 
						|
    # POST /api/v1/get/import/item
 | 
						|
    def test_0002_api_get_import_item(self):
 | 
						|
        input_json = {"uuid": self.__class__.import_uuid}
 | 
						|
        item_not_imported = True
 | 
						|
        import_timout = 60
 | 
						|
        start = time.time()
 | 
						|
 | 
						|
        while item_not_imported:
 | 
						|
            req = self.client.post('/api/v1/get/import/item', json=input_json ,headers={ 'Authorization': self.apikey })
 | 
						|
            req_json = parse_response(self, req)
 | 
						|
            if req_json['status'] == 'imported':
 | 
						|
                try:
 | 
						|
                    item_id = req_json['items'][0]
 | 
						|
                    item_not_imported = False
 | 
						|
                except Exception as e:
 | 
						|
                    if time.time() - start > import_timout:
 | 
						|
                        item_not_imported = False
 | 
						|
                        self.fail("Import error: {}".format(req_json))
 | 
						|
            else:
 | 
						|
                if time.time() - start > import_timout:
 | 
						|
                    item_not_imported = False
 | 
						|
                    self.fail("Import Timeout, import status: {}".format(req_json['status']))
 | 
						|
        self.__class__.item_id = item_id
 | 
						|
 | 
						|
        # Process item
 | 
						|
        time.sleep(5)
 | 
						|
 | 
						|
    # POST /api/v1/get/item/content
 | 
						|
    def test_0003_api_get_item_content(self):
 | 
						|
        input_json = {"id": self.__class__.item_id}
 | 
						|
        req = self.client.post('/api/v1/get/item/content', json=input_json ,headers={ 'Authorization': self.apikey })
 | 
						|
        req_json = parse_response(self, req)
 | 
						|
        item_content = req_json['content']
 | 
						|
        self.assertEqual(item_content, self.item_content)
 | 
						|
 | 
						|
    # POST /api/v1/get/item/tag
 | 
						|
    def test_0004_api_get_item_tag(self):
 | 
						|
        input_json = {"id": self.__class__.item_id}
 | 
						|
        req = self.client.post('/api/v1/get/item/tag', json=input_json ,headers={ 'Authorization': self.apikey })
 | 
						|
        req_json = parse_response(self, req)
 | 
						|
        item_tags = req_json['tags']
 | 
						|
        self.assertCountEqual(item_tags, self.expected_tags)
 | 
						|
 | 
						|
    # POST /api/v1/get/item/tag
 | 
						|
    def test_0005_api_get_item_default(self):
 | 
						|
        input_json = {"id": self.__class__.item_id}
 | 
						|
        req = self.client.post('/api/v1/get/item/default', json=input_json ,headers={ 'Authorization': self.apikey })
 | 
						|
        req_json = parse_response(self, req)
 | 
						|
        item_tags = req_json['tags']
 | 
						|
        self.assertCountEqual(item_tags, self.expected_tags)
 | 
						|
        item_content = req_json['content']
 | 
						|
        self.assertEqual(item_content, self.item_content)
 | 
						|
 | 
						|
    # POST /api/v1/get/item/tag
 | 
						|
    # # TODO: add more test
 | 
						|
    def test_0006_api_get_item(self):
 | 
						|
        input_json = {"id": self.__class__.item_id, "content": True}
 | 
						|
        req = self.client.post('/api/v1/get/item', json=input_json ,headers={ 'Authorization': self.apikey })
 | 
						|
        req_json = parse_response(self, req)
 | 
						|
        item_tags = req_json['tags']
 | 
						|
        self.assertCountEqual(item_tags, self.expected_tags)
 | 
						|
        item_content = req_json['content']
 | 
						|
        self.assertEqual(item_content, self.item_content)
 | 
						|
 | 
						|
    # POST api/v1/add/item/tag
 | 
						|
    def test_0007_api_add_item_tag(self):
 | 
						|
        tags_to_add = ["infoleak:analyst-detection=\"api-key\""]
 | 
						|
        current_item_tag = Tag.get_obj_tag(self.__class__.item_id)
 | 
						|
        current_item_tag.append(tags_to_add[0])
 | 
						|
 | 
						|
        #galaxy_to_add = ["misp-galaxy:stealer=\"Vidar\""]
 | 
						|
        input_json = {"id": self.__class__.item_id, "tags": tags_to_add}
 | 
						|
        req = self.client.post('/api/v1/add/item/tag', json=input_json ,headers={ 'Authorization': self.apikey })
 | 
						|
        req_json = parse_response(self, req)
 | 
						|
        item_tags = req_json['tags']
 | 
						|
        self.assertEqual(item_tags, tags_to_add)
 | 
						|
 | 
						|
        new_item_tag = Tag.get_obj_tag(self.__class__.item_id)
 | 
						|
        self.assertCountEqual(new_item_tag, current_item_tag)
 | 
						|
 | 
						|
    # DELETE api/v1/delete/item/tag
 | 
						|
    def test_0008_api_add_item_tag(self):
 | 
						|
        tags_to_delete = ["infoleak:analyst-detection=\"api-key\""]
 | 
						|
        input_json = {"id": self.__class__.item_id, "tags": tags_to_delete}
 | 
						|
        req = self.client.delete('/api/v1/delete/item/tag', json=input_json ,headers={ 'Authorization': self.apikey })
 | 
						|
        req_json = parse_response(self, req)
 | 
						|
        item_tags = req_json['tags']
 | 
						|
        self.assertCountEqual(item_tags, tags_to_delete)
 | 
						|
        current_item_tag = Tag.get_obj_tag(self.__class__.item_id)
 | 
						|
        if tags_to_delete[0] in current_item_tag:
 | 
						|
            self.fail('Tag no deleted')
 | 
						|
 | 
						|
    # POST api/v1/get/tag/metadata
 | 
						|
    def test_0009_api_add_item_tag(self):
 | 
						|
        input_json = {"tag": self.item_tags[0]}
 | 
						|
        req = self.client.post('/api/v1/get/tag/metadata', json=input_json ,headers={ 'Authorization': self.apikey })
 | 
						|
        req_json = parse_response(self, req)
 | 
						|
        self.assertEqual(req_json['tag'], self.item_tags[0])
 | 
						|
 | 
						|
    # GET api/v1/get/tag/all
 | 
						|
    def test_0010_api_add_item_tag(self):
 | 
						|
        input_json = {"tag": self.item_tags[0]}
 | 
						|
        req = self.client.get('/api/v1/get/tag/all', json=input_json ,headers={ 'Authorization': self.apikey })
 | 
						|
        req_json = parse_response(self, req)
 | 
						|
        self.assertTrue(req_json['tags'])
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main(argv=['first-arg-is-ignored'], exit=False)
 |