diff --git a/bin/Keys.py b/bin/Keys.py index 4d576591..8d42e6cd 100755 --- a/bin/Keys.py +++ b/bin/Keys.py @@ -17,15 +17,12 @@ RSA private key, certificate messages ################################## import time from enum import Enum -from pubsublogger import publisher - ################################## # Import Project packages ################################## from module.abstract_module import AbstractModule -from packages import Paste -from Helper import Process +from packages.Item import Item class KeyEnum(Enum): @@ -49,7 +46,7 @@ class Keys(AbstractModule): """ Keys module for AIL framework """ - + def __init__(self): super(Keys, self).__init__() @@ -58,124 +55,124 @@ class Keys(AbstractModule): def compute(self, message): - paste = Paste.Paste(message) - content = paste.get_p_content() + item = Item(message) + content = item.get_content() find = False get_pgp_content = False if KeyEnum.PGP_MESSAGE.value in content: - self.redis_logger.warning('{} has a PGP enc message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has a PGP enc message') - msg = 'infoleak:automatic-detection="pgp-message";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="pgp-message";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') get_pgp_content = True find = True if KeyEnum.PGP_PUBLIC_KEY_BLOCK.value in content: - msg = 'infoleak:automatic-detection="pgp-public-key-block";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="pgp-public-key-block";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') get_pgp_content = True if KeyEnum.PGP_SIGNATURE.value in content: - msg = 'infoleak:automatic-detection="pgp-signature";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="pgp-signature";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') get_pgp_content = True if KeyEnum.CERTIFICATE.value in content: - self.redis_logger.warning('{} has a certificate message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has a certificate message') - msg = 'infoleak:automatic-detection="certificate";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="certificate";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True if KeyEnum.RSA_PRIVATE_KEY.value in content: - self.redis_logger.warning('{} has a RSA private key message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has a RSA private key message') print('rsa private key message found') - msg = 'infoleak:automatic-detection="rsa-private-key";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="rsa-private-key";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True if KeyEnum.PRIVATE_KEY.value in content: - self.redis_logger.warning('{} has a private key message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has a private key message') print('private key message found') - msg = 'infoleak:automatic-detection="private-key";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="private-key";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True if KeyEnum.ENCRYPTED_PRIVATE_KEY.value in content: - self.redis_logger.warning('{} has an encrypted private key message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has an encrypted private key message') print('encrypted private key message found') - msg = 'infoleak:automatic-detection="encrypted-private-key";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="encrypted-private-key";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True if KeyEnum.OPENSSH_PRIVATE_KEY.value in content: - self.redis_logger.warning('{} has an openssh private key message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has an openssh private key message') print('openssh private key message found') - msg = 'infoleak:automatic-detection="private-ssh-key";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True if KeyEnum.SSH2_ENCRYPTED_PRIVATE_KEY.value in content: - self.redis_logger.warning('{} has an ssh2 private key message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has an ssh2 private key message') print('SSH2 private key message found') - msg = 'infoleak:automatic-detection="private-ssh-key";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True if KeyEnum.OPENVPN_STATIC_KEY_V1.value in content: - self.redis_logger.warning('{} has an openssh private key message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has an openssh private key message') print('OpenVPN Static key message found') - msg = 'infoleak:automatic-detection="vpn-static-key";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="vpn-static-key";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True if KeyEnum.DSA_PRIVATE_KEY.value in content: - self.redis_logger.warning('{} has a dsa private key message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has a dsa private key message') - msg = 'infoleak:automatic-detection="dsa-private-key";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="dsa-private-key";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True if KeyEnum.EC_PRIVATE_KEY.value in content: - self.redis_logger.warning('{} has an ec private key message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has an ec private key message') - msg = 'infoleak:automatic-detection="ec-private-key";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="ec-private-key";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True if KeyEnum.PGP_PRIVATE_KEY_BLOCK.value in content: - self.redis_logger.warning('{} has a pgp private key block message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has a pgp private key block message') - msg = 'infoleak:automatic-detection="pgp-private-key";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="pgp-private-key";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True if KeyEnum.PUBLIC_KEY.value in content: - self.redis_logger.warning('{} has a public key message'.format(paste.p_name)) + self.redis_logger.warning(f'{item.get_basename()} has a public key message') - msg = 'infoleak:automatic-detection="public-key";{}'.format(message) - self.process.populate_set_out(msg, 'Tags') + msg = f'infoleak:automatic-detection="public-key";{item.get_id()}' + self.send_message_to_queue(msg, 'Tags') find = True # pgp content if get_pgp_content: - self.process.populate_set_out(message, 'PgpDump') + self.send_message_to_queue(item.get_id(), 'PgpDump') if find : #Send to duplicate - self.process.populate_set_out(message, 'Duplicate') - self.redis_logger.debug(message) - + self.send_message_to_queue(item.get_id(), 'Duplicate') + self.redis_logger.debug(f'{item.get_id()} has key(s)') + print(f'{item.get_id()} has key(s)') if __name__ == '__main__': - + module = Keys() module.run() diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index 6010ce11..02c4a38b 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -493,7 +493,7 @@ function update_thirdparty { function launch_tests() { tests_dir=${AIL_HOME}/tests bin_dir=${AIL_BIN} - python3 `which nosetests` -w $tests_dir --with-coverage --cover-package=$bin_dir -d + python3 `which nosetests` -w $tests_dir --with-coverage --cover-package=$bin_dir -d #--cover-erase } function reset_password() { diff --git a/bin/packages/Item.py b/bin/packages/Item.py index 0a54b0b7..5ecc85f2 100755 --- a/bin/packages/Item.py +++ b/bin/packages/Item.py @@ -599,7 +599,11 @@ class Item(AbstractObject): # # WARNING: UNCLEAN DELETE /!\ TEST ONLY /!\ # TODO: DELETE ITEM CORRELATION + TAGS + METADATA + ... def delete(self): - os.remove(self.get_filename()) + try: + os.remove(self.get_filename()) + return True + except FileNotFoundError: + return False # if __name__ == '__main__': # diff --git a/samples/2018/01/01/keys_certificat_sample.gz b/samples/2018/01/01/keys_certificat_sample.gz deleted file mode 100644 index d3427e10..00000000 Binary files a/samples/2018/01/01/keys_certificat_sample.gz and /dev/null differ diff --git a/tests/testKeys.py b/tests/testKeys.py deleted file mode 100644 index 9dc45c75..00000000 --- a/tests/testKeys.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import sys,os -import unittest -import magic - -sys.path.append(os.environ['AIL_BIN']) - -from packages.Paste import Paste -import Keys as Keys -from Helper import Process -from pubsublogger import publisher - - -class TestKeysModule(unittest.TestCase): - - def setUp(self): - self.paste = Paste('../samples/2018/01/01/keys_certificat_sample.gz') - - # Section name in bin/packages/modules.cfg - self.config_section = 'Keys' - - # Setup the I/O queues - p = Process(self.config_section) - - - def test_search_key(self): - with self.assertRaises(pubsublogger.exceptions.NoChannelError): - Keys.search_key(self.paste) - - def test_search_key(self): - with self.assertRaises(NameError): - publisher.port = 6380 - publisher.channel = 'Script' - Keys.search_key(self.paste) diff --git a/tests/test_modules.py b/tests/test_modules.py index c1a619ac..8ae47792 100644 --- a/tests/test_modules.py +++ b/tests/test_modules.py @@ -16,6 +16,7 @@ from Categ import Categ from CreditCards import CreditCards from DomClassifier import DomClassifier from Global import Global +from Keys import Keys from Onion import Onion # project packages @@ -107,7 +108,7 @@ class Test_Module_Global(unittest.TestCase): message = f'{item_id} {item_content_2}' result = self.module_obj.compute(message, r_result=True) print(result) - self.assertIn(result, item_id) + self.assertIn(item_id[:-3], result) self.assertNotEqual(result, item_id) # cleanup @@ -115,6 +116,16 @@ class Test_Module_Global(unittest.TestCase): item.delete() # # TODO: remove from queue +class Test_Module_Keys(unittest.TestCase): + + def setUp(self): + self.module_obj = Keys() + + def test_module(self): + item_id = 'tests/2021/01/01/keys.gz' + # # TODO: check results + result = self.module_obj.compute(item_id) + class Test_Module_Onion(unittest.TestCase): def setUp(self):