chg: [ApiKey] refactor module + tests

pull/569/head
Terrtia 2021-05-19 14:54:34 +02:00
parent 4896db98a3
commit 4a9bda2ee8
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
3 changed files with 89 additions and 77 deletions

View File

@ -7,85 +7,84 @@ The ApiKey Module
This module is consuming the Redis-list created by the Categ module.
It apply API_key regexes on paste content and warn if above a threshold.
Search for API keys on an item content.
"""
import redis
import pprint
import time
import re
from packages import Paste
from packages import lib_refine
from pubsublogger import publisher
# project packages
from module.abstract_module import AbstractModule
from packages.Item import Item
from lib import regex_helper
from Helper import Process
class ApiKey(AbstractModule):
"""ApiKey module for AIL framework"""
def __init__(self):
super(ApiKey, self).__init__()
def search_api_key(message):
filename, score = message.split()
paste = Paste.Paste(filename)
content = paste.get_p_content()
self.redis_cache_key = regex_helper.generate_redis_cache_key(self.module_name)
aws_access_key = regex_aws_access_key.findall(content)
aws_secret_key = regex_aws_secret_key.findall(content)
google_api_key = regex_google_api_key.findall(content)
# # TODO: ENUM or dict
if(len(aws_access_key) > 0 or len(aws_secret_key) > 0 or len(google_api_key) > 0):
# TODO improve REGEX
#r'(?<![A-Z0-9])=[A-Z0-9]{20}(?![A-Z0-9])'
#r'(?<!=[A-Za-z0-9+])=[A-Za-z0-9+]{40}(?![A-Za-z0-9+])'
self.re_aws_access_key = r'AKIA[0-9A-Z]{16}'
self.re_aws_secret_key = r'[0-9a-zA-Z/+]{40}'
re.compile(self.re_aws_access_key)
re.compile(self.re_aws_secret_key)
to_print = 'ApiKey;{};{};{};'.format(
paste.p_source, paste.p_date, paste.p_name)
if(len(google_api_key) > 0):
print('found google api key')
print(to_print)
publisher.warning('{}Checked {} found Google API Key;{}'.format(
to_print, len(google_api_key), paste.p_rel_path))
msg = 'infoleak:automatic-detection="google-api-key";{}'.format(filename)
p.populate_set_out(msg, 'Tags')
# r'=AIza[0-9a-zA-Z-_]{35}' keep equal ????
self.re_google_api_key = r'AIza[0-9a-zA-Z-_]{35}'
re.compile(self.re_google_api_key)
if(len(aws_access_key) > 0 or len(aws_secret_key) > 0):
print('found AWS key')
print(to_print)
total = len(aws_access_key) + len(aws_secret_key)
publisher.warning('{}Checked {} found AWS Key;{}'.format(
to_print, total, paste.p_rel_path))
msg = 'infoleak:automatic-detection="aws-key";{}'.format(filename)
p.populate_set_out(msg, 'Tags')
# Send module state to logs
self.redis_logger.info(f"Module {self.module_name} initialized")
def compute(self, message, r_match=False):
id, score = message.split()
item = Item(id)
item_content = item.get_content()
msg = 'infoleak:automatic-detection="api-key";{}'.format(filename)
p.populate_set_out(msg, 'Tags')
google_api_key = regex_helper.regex_findall(self.module_name, self.redis_cache_key, self.re_google_api_key, item.get_id(), item_content)
#Send to duplicate
p.populate_set_out(filename, 'Duplicate')
aws_access_key = regex_helper.regex_findall(self.module_name, self.redis_cache_key, self.re_aws_access_key, item.get_id(), item_content)
if aws_access_key:
aws_secret_key = regex_helper.regex_findall(self.module_name, self.redis_cache_key, self.re_aws_secret_key, item.get_id(), item_content)
if aws_access_key or google_api_key:
to_print = f'ApiKey;{item.get_source()};{item.get_date()};{item.get_basename()};'
if google_api_key:
print(f'found google api key: {to_print}')
self.redis_logger.warning(f'{to_print}Checked {len(google_api_key)} found Google API Key;{item.get_id()}')
msg = f'infoleak:automatic-detection="google-api-key";{item.get_id()}'
self.send_message_to_queue('Tags', msg)
# # TODO: # FIXME: AWS regex/validate/sanityze KEY + SECRET KEY
if aws_access_key:
print(f'found AWS key: {to_print}')
self.redis_logger.warning(f'{to_print}Checked {len(aws_access_key)} found AWS Key;{item.get_id()}')
if aws_secret_key:
print(f'found AWS secret key')
self.redis_logger.warning(f'{to_print}Checked {len(aws_secret_key)} found AWS secret Key;{item.get_id()}')
msg = 'infoleak:automatic-detection="aws-key";{}'.format(item.get_id())
self.send_message_to_queue('Tags', msg)
# Tags
msg = f'infoleak:automatic-detection="api-key";{item.get_id()}'
self.send_message_to_queue('Tags', msg)
# Send to duplicate
self.send_message_to_queue('Duplicate', item.get_id())
if r_match:
return (google_api_key, aws_access_key, aws_secret_key)
if __name__ == "__main__":
publisher.port = 6380
publisher.channel = "Script"
config_section = 'ApiKey'
p = Process(config_section)
publisher.info("ApiKey started")
message = p.get_from_set()
# TODO improve REGEX
regex_aws_access_key = re.compile(r'(?<![A-Z0-9])=[A-Z0-9]{20}(?![A-Z0-9])')
regex_aws_secret_key = re.compile(r'(?<!=[A-Za-z0-9+])=[A-Za-z0-9+]{40}(?![A-Za-z0-9+])')
regex_google_api_key = re.compile(r'=AIza[0-9a-zA-Z-_]{35}')
while True:
message = p.get_from_set()
if message is not None:
search_api_key(message)
else:
publisher.debug("Script ApiKey is Idling 10s")
time.sleep(10)
module = ApiKey()
module.run()

View File

@ -1,24 +1,16 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The ZMQ_Sub_Onion Module
The Onion Module
============================
This module is consuming the Redis-list created by the ZMQ_Sub_Onion_Q Module.
It trying to extract url from paste and returning only ones which are tor
related (.onion)
..seealso:: Paste method (get_regex)
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
the same Subscriber name in both of them.
This module extract url from item and returning only ones which are tor
related (.onion). All These urls are send to the crawler discovery queue.
Requirements
------------
*Need running Redis instances. (Redis)
*Need the ZMQ_Sub_Onion_Q Module running to be able to work properly.
"""
import time
@ -27,6 +19,7 @@ import os
import sys
import re
# project packages
from module.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
from lib import crawlers

View File

@ -8,11 +8,28 @@ import unittest
sys.path.append(os.environ['AIL_BIN'])
# Modules Classes
from ApiKey import ApiKey
from Onion import Onion
# projects packages
# project packages
import lib.crawlers as crawlers
class Test_Module_ApiKey(unittest.TestCase):
def setUp(self):
self.module_obj = ApiKey()
def test_module(self):
item_id = 'tests/2021/01/01/api_keys.gz'
google_api_key = 'AIza00000000000000000000000_example-KEY'
aws_access_key = 'AKIAIOSFODNN7EXAMPLE'
aws_secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
matches = self.module_obj.compute(f'{item_id} 3', r_match=True)
self.assertCountEqual(matches[0], [google_api_key])
self.assertCountEqual(matches[1], [aws_access_key])
self.assertCountEqual(matches[2], [aws_secret_key])
class Test_Module_Onion(unittest.TestCase):
def setUp(self):
@ -41,3 +58,6 @@ class Test_Module_Onion(unittest.TestCase):
else:
# # TODO: check warning logs
pass
if __name__ == '__main__':
unittest.main()