mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			245 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			245 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
| #!/usr/bin/env python3
 | |
| # -*-coding:UTF-8 -*
 | |
| """
 | |
| The PgpDump Module
 | |
| ======================
 | |
| 
 | |
| This module Extract ID from PGP Blocks.
 | |
| 
 | |
| """
 | |
| 
 | |
| ##################################
 | |
| # Import External packages
 | |
| ##################################
 | |
| import os
 | |
| import sys
 | |
| import subprocess
 | |
| 
 | |
| from bs4 import BeautifulSoup
 | |
| from uuid import uuid4
 | |
| 
 | |
| sys.path.append(os.environ['AIL_BIN'])
 | |
| ##################################
 | |
| # Import Project packages
 | |
| ##################################
 | |
| from modules.abstract_module import AbstractModule
 | |
| from lib.objects import Pgps
 | |
| from lib.objects.Items import Item
 | |
| from trackers.Tracker_Term import Tracker_Term
 | |
| from trackers.Tracker_Regex import Tracker_Regex
 | |
| from trackers.Tracker_Yara import Tracker_Yara
 | |
| 
 | |
| 
 | |
| class PgpDump(AbstractModule):
 | |
|     """
 | |
|     Cve module for AIL framework
 | |
|     """
 | |
| 
 | |
|     def __init__(self):
 | |
|         super(PgpDump, self).__init__()
 | |
| 
 | |
|         # check/create pgpdump queue directory (used for huge pgp blocks)
 | |
|         self.pgpdump_dir = os.path.join(os.environ['AIL_HOME'], 'temp', 'pgpdump')
 | |
|         if not os.path.isdir(self.pgpdump_dir):
 | |
|             os.makedirs(self.pgpdump_dir)
 | |
| 
 | |
|         # Regex
 | |
|         self.reg_user_id = r'User ID - .+'
 | |
|         self.reg_key_id = r'Key ID - .+'
 | |
|         self.reg_pgp_public_blocs = r'-----BEGIN PGP PUBLIC KEY BLOCK-----[\s\S]+?-----END PGP PUBLIC KEY BLOCK-----'
 | |
|         self.reg_pgp_private_blocs = r'-----BEGIN PGP PRIVATE KEY BLOCK-----[\s\S]+?-----END PGP PRIVATE KEY BLOCK-----'
 | |
|         self.reg_pgp_signature = r'-----BEGIN PGP SIGNATURE-----[\s\S]+?-----END PGP SIGNATURE-----'
 | |
|         self.reg_pgp_message = r'-----BEGIN PGP MESSAGE-----[\s\S]+?-----END PGP MESSAGE-----'
 | |
|         self.reg_tool_version = r'\bVersion:.*\n'
 | |
|         self.reg_block_comment = r'\bComment:.*\n'
 | |
| 
 | |
|         # Waiting time in seconds between to message processed
 | |
|         self.pending_seconds = 1
 | |
| 
 | |
|         self.tracker_term = Tracker_Term(queue=False)
 | |
|         self.tracker_regex = Tracker_Regex(queue=False)
 | |
|         self.tracker_yara = Tracker_Yara(queue=False)
 | |
| 
 | |
|         # init
 | |
|         self.item_id = None
 | |
|         self.keys = set()
 | |
|         self.private_keys = set()
 | |
|         self.names = set()
 | |
|         self.mails = set()
 | |
|         self.symmetrically_encrypted = False
 | |
| 
 | |
|         # Send module state to logs
 | |
|         self.logger.info(f'Module {self.module_name} initialized')
 | |
| 
 | |
|     def remove_html(self, pgp_block):
 | |
|         try:
 | |
|             if bool(BeautifulSoup(pgp_block, "html.parser").find()):
 | |
|                 soup = BeautifulSoup(pgp_block, 'html.parser')
 | |
|                 # kill all script and style elements
 | |
|                 for script in soup(["script", "style"]):
 | |
|                     script.extract()  # remove
 | |
| 
 | |
|                 # get text
 | |
|                 text = soup.get_text()
 | |
|                 return text
 | |
|             else:
 | |
|                 return pgp_block
 | |
|         except TypeError:
 | |
|             return pgp_block
 | |
| 
 | |
|     def sanitize_pgp_block(self, pgp_block):
 | |
|         # Debug
 | |
|         print(pgp_block)
 | |
|         print()
 | |
|         pgp_block = self.remove_html(pgp_block)
 | |
|         # Remove Version
 | |
|         versions = self.regex_findall(self.reg_tool_version, self.item_id, pgp_block)
 | |
|         for version in versions:
 | |
|             pgp_block = pgp_block.replace(version, '')
 | |
|         # Remove Comment
 | |
|         comments = self.regex_findall(self.reg_block_comment, self.item_id, pgp_block)
 | |
|         for comment in comments:
 | |
|             pgp_block = pgp_block.replace(comment, '')
 | |
|         # Remove Empty Lines
 | |
|         pgp_block = [s for s in pgp_block.splitlines() if (s and not s.isspace())]
 | |
|         pgp_block[0] = pgp_block[0] + '\n'
 | |
|         pgp_block[-1] = '\n' + pgp_block[-1]
 | |
|         pgp_block = '\n'.join(pgp_block)
 | |
| 
 | |
|         # Debug
 | |
|         print(pgp_block)
 | |
|         print('-------------------------------------------------------------------------')
 | |
|         return pgp_block
 | |
| 
 | |
|     def get_pgpdump_from_file(self, pgp_block):
 | |
|         print('Save PGP Block in File')
 | |
|         file_uuid = str(uuid4())
 | |
|         filepath = os.path.join(self.pgpdump_dir, file_uuid)
 | |
|         with open(filepath, 'w') as f:
 | |
|             f.write(pgp_block)
 | |
|         process1 = subprocess.Popen(['pgpdump', filepath], stdout=subprocess.PIPE)
 | |
|         output = process1.communicate()[0].decode()
 | |
|         os.remove(filepath)
 | |
|         return output
 | |
| 
 | |
|     def get_pgpdump_from_terminal(self, pgp_block):
 | |
|         process1 = subprocess.Popen(['echo', '-e', pgp_block], stdout=subprocess.PIPE)
 | |
|         process2 = subprocess.Popen(['pgpdump'], stdin=process1.stdout, stdout=subprocess.PIPE)
 | |
|         process1.stdout.close()
 | |
|         output = process2.communicate()[0]
 | |
|         try:
 | |
|             output = output.decode()
 | |
|         except UnicodeDecodeError:
 | |
|             self.logger.error(f'Error PgpDump UnicodeDecodeError: {self.item_id}')
 | |
|             output = ''
 | |
|         return output
 | |
| 
 | |
|     def get_pgpdump(self, pgp_block):
 | |
|         if len(pgp_block) > 131072:
 | |
|             return self.get_pgpdump_from_file(pgp_block)
 | |
|         else:
 | |
|             return self.get_pgpdump_from_terminal(pgp_block)
 | |
| 
 | |
|     def extract_id_from_pgpdump_output(self, pgpdump_output):
 | |
|         if 'Secret Key Packet' in pgpdump_output:
 | |
|             private = True
 | |
|         else:
 | |
|             private = False
 | |
|         users = self.regex_findall(self.reg_user_id, self.item_id, pgpdump_output)
 | |
|         for user in users:
 | |
|             # avoid key injection in user_id:
 | |
|             pgpdump_output.replace(user, '', 1)
 | |
|             user = user.replace('User ID - ', '', 1)
 | |
|             if ' <' in user:
 | |
|                 name, mail = user.rsplit(' <', 1)
 | |
|                 mail = mail[:-1]
 | |
|                 self.names.add(name)
 | |
|                 self.mails.add(mail)
 | |
|             else:
 | |
|                 name = user
 | |
|                 self.names.add(name)
 | |
| 
 | |
|         keys = self.regex_findall(self.reg_key_id, self.item_id, pgpdump_output)
 | |
|         for key_id in keys:
 | |
|             key_id = key_id.replace('Key ID - ', '', 1)
 | |
|             if key_id != '0x0000000000000000':
 | |
|                 self.keys.add(key_id)
 | |
|                 if private:
 | |
|                     self.private_keys.add(key_id)
 | |
|             else:
 | |
|                 self.symmetrically_encrypted = True
 | |
|                 print('symmetrically encrypted')
 | |
| 
 | |
|     def compute(self, message):
 | |
|         item = Item(message)
 | |
|         self.item_id = item.get_id()
 | |
|         content = item.get_content()
 | |
| 
 | |
|         pgp_blocks = []
 | |
|         # Public Block
 | |
|         for pgp_block in self.regex_findall(self.reg_pgp_public_blocs, self.item_id, content):
 | |
|             # content = content.replace(pgp_block, '')
 | |
|             pgp_block = self.sanitize_pgp_block(pgp_block)
 | |
|             pgp_blocks.append(pgp_block)
 | |
|         # Private Block
 | |
|         for pgp_block in self.regex_findall(self.reg_pgp_private_blocs, self.item_id, content):
 | |
|             # content = content.replace(pgp_block, '')
 | |
|             pgp_block = self.sanitize_pgp_block(pgp_block)
 | |
|             pgp_blocks.append(pgp_block)
 | |
|         # Signature
 | |
|         for pgp_block in self.regex_findall(self.reg_pgp_signature, self.item_id, content):
 | |
|             # content = content.replace(pgp_block, '')
 | |
|             pgp_block = self.sanitize_pgp_block(pgp_block)
 | |
|             pgp_blocks.append(pgp_block)
 | |
|         # Message
 | |
|         for pgp_block in self.regex_findall(self.reg_pgp_message, self.item_id, content):
 | |
|             pgp_block = self.sanitize_pgp_block(pgp_block)
 | |
|             pgp_blocks.append(pgp_block)
 | |
| 
 | |
|         self.symmetrically_encrypted = False
 | |
|         self.keys = set()
 | |
|         self.private_keys = set()
 | |
|         self.names = set()
 | |
|         self.mails = set()
 | |
|         for pgp_block in pgp_blocks:
 | |
|             pgpdump_output = self.get_pgpdump(pgp_block)
 | |
|             self.extract_id_from_pgpdump_output(pgpdump_output)
 | |
| 
 | |
|         if self.keys or self.names or self.mails:
 | |
|             print(self.item_id)
 | |
|             date = item.get_date()
 | |
|             for key in self.keys:
 | |
|                 pgp = Pgps.Pgp(key, 'key')
 | |
|                 pgp.add(date, self.item_id)
 | |
|                 print(f'    key: {key}')
 | |
|             for name in self.names:
 | |
|                 pgp = Pgps.Pgp(name, 'name')
 | |
|                 pgp.add(date, self.item_id)
 | |
|                 print(f'    name: {name}')
 | |
|                 self.tracker_term.compute(name, obj_type='pgp', subtype='name')
 | |
|                 self.tracker_regex.compute(name, obj_type='pgp', subtype='name')
 | |
|                 self.tracker_yara.compute(name, obj_type='pgp', subtype='name')
 | |
|             for mail in self.mails:
 | |
|                 pgp = Pgps.Pgp(mail, 'mail')
 | |
|                 pgp.add(date, self.item_id)
 | |
|                 print(f'    mail: {mail}')
 | |
|                 self.tracker_term.compute(mail, obj_type='pgp', subtype='mail')
 | |
|                 self.tracker_regex.compute(mail, obj_type='pgp', subtype='mail')
 | |
|                 self.tracker_yara.compute(mail, obj_type='pgp', subtype='mail')
 | |
| 
 | |
|             # Keys extracted from PGP PRIVATE KEY BLOCK
 | |
|             for key in self.private_keys:
 | |
|                 pgp = Pgps.Pgp(key, 'key')
 | |
|                 pgp.add_tag('infoleak:automatic-detection="pgp-private-key"')
 | |
|                 print(f'    private key: {key}')
 | |
| 
 | |
|         if self.symmetrically_encrypted:
 | |
|             msg = f'infoleak:automatic-detection="pgp-symmetric";{self.item_id}'
 | |
|             self.add_message_to_queue(msg, 'Tags')
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     module = PgpDump()
 | |
|     module.run()
 | |
| 
 |