mirror of https://github.com/MISP/misp-modules
				
				
				
			
		
			
				
	
	
		
			155 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			155 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
| import json
 | |
| import requests
 | |
| import re
 | |
| 
 | |
| misperrors = {'error': 'Error'}
 | |
| mispattributes = {'input': ["hostname", "domain", "ip-src", "ip-dst", "md5", "sha1", "sha256", "sha512"],
 | |
|                   'output': ["domain", "ip-src", "ip-dst", "text", "md5", "sha1", "sha256", "sha512", "email"]
 | |
|                   }
 | |
| 
 | |
| # possible module-types: 'expansion', 'hover' or both
 | |
| moduleinfo = {'version': '1', 'author': 'chrisdoman',
 | |
|               'description': 'Get information from AlienVault OTX',
 | |
|               'module-type': ['expansion']}
 | |
| 
 | |
| # We're not actually using the API key yet
 | |
| moduleconfig = ["apikey"]
 | |
| 
 | |
| # Avoid adding windows update to enrichment etc.
 | |
| def isBlacklisted(value):
 | |
|     blacklist = ['0.0.0.0', '8.8.8.8', '255.255.255.255', '192.168.56.' , 'time.windows.com']
 | |
| 
 | |
|     for b in blacklist:
 | |
|         if value in b:
 | |
|             return False
 | |
| 
 | |
|     return True
 | |
| 
 | |
| def valid_ip(ip):
 | |
|     m = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", ip)
 | |
|     return bool(m) and all(map(lambda n: 0 <= int(n) <= 255, m.groups()))
 | |
| 
 | |
| def findAll(data, keys):
 | |
|     a = []
 | |
|     if isinstance(data, dict):
 | |
|         for key in data.keys():
 | |
|             if key == keys:
 | |
|                 a.append(data[key])
 | |
|             else:
 | |
|                 if isinstance(data[key], (dict, list)):
 | |
|                     a += findAll(data[key], keys)
 | |
|     if isinstance(data, list):
 | |
|         for i in data:
 | |
|             a += findAll(i, keys)
 | |
| 
 | |
|     return a
 | |
| 
 | |
| def valid_email(email):
 | |
|     return bool(re.search(r"[a-zA-Z0-9!#$%&'*+\/=?^_`{|}~-]+(?:\.[a-zA-Z0-9!#$%&'*+\/=?^_`{|}~-]+)*@(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]*[a-zA-Z0-9])?\.)+[a-zA-Z0-9](?:[a-zA-Z0-9-]*[a-zA-Z0-9])?", email))
 | |
| 
 | |
| def handler(q=False):
 | |
|     if q is False:
 | |
|         return False
 | |
| 
 | |
|     q = json.loads(q)
 | |
| 
 | |
|     key = q["config"]["apikey"]
 | |
| 
 | |
|     r = {"results": []}
 | |
| 
 | |
|     if "ip-src" in q:
 | |
|         r["results"] += getIP(q["ip-src"], key)
 | |
|     if "ip-dst" in q:
 | |
|         r["results"] += getIP(q["ip-dst"], key)
 | |
|     if "domain" in q:
 | |
|         r["results"] += getDomain(q["domain"], key)
 | |
|     if 'hostname' in q:
 | |
|         r["results"] += getDomain(q['hostname'], key)
 | |
|     if 'md5' in q:
 | |
|         r["results"] += getHash(q['md5'], key)
 | |
|     if 'sha1' in q:
 | |
|         r["results"] += getHash(q['sha1'], key)
 | |
|     if 'sha256' in q:
 | |
|         r["results"] += getHash(q['sha256'], key)
 | |
|     if 'sha512' in q:
 | |
|         r["results"] += getHash(q['sha512'], key)
 | |
| 
 | |
|     uniq = []
 | |
|     for res in r["results"]:
 | |
|         if res not in uniq:
 | |
|             uniq.append(res)
 | |
|     r["results"] = uniq
 | |
|     return r
 | |
| 
 | |
| 
 | |
| def getHash(hash, key):
 | |
| 
 | |
|     ret = []
 | |
|     req = json.loads(requests.get("https://otx.alienvault.com/otxapi/indicator/file/analysis/" + hash).text)
 | |
| 
 | |
|     for ip in findAll(req, "dst"):
 | |
|         if not isBlacklisted(ip) and valid_ip(ip):
 | |
|             ret.append({"types": ["ip-dst", "ip-src"], "values": [ip]})
 | |
| 
 | |
|     for domain in findAll(req, "hostname"):
 | |
|         if "." in domain and not isBlacklisted(domain):
 | |
|             ret.append({"types": ["hostname"], "values": [domain]})
 | |
| 
 | |
|     return ret
 | |
| 
 | |
| 
 | |
| def getIP(ip, key):
 | |
|     ret = []
 | |
|     req = json.loads( requests.get("https://otx.alienvault.com/otxapi/indicator/ip/malware/" + ip + "?limit=1000").text )
 | |
| 
 | |
|     for hash in findAll(req, "hash"):
 | |
|         ret.append({"types": ["sha256"], "values": [hash]})
 | |
| 
 | |
| 
 | |
|     req = json.loads( requests.get("https://otx.alienvault.com/otxapi/indicator/ip/passive_dns/" + ip).text )
 | |
| 
 | |
|     for hostname in findAll(req, "hostname"):
 | |
|         if not isBlacklisted(hostname):
 | |
|             ret.append({"types": ["hostname"], "values": [hostname]})
 | |
| 
 | |
| 
 | |
|     return ret
 | |
| 
 | |
| 
 | |
| def getDomain(domain, key):
 | |
| 
 | |
|     ret = []
 | |
| 
 | |
|     req = json.loads( requests.get("https://otx.alienvault.com/otxapi/indicator/domain/malware/" + domain + "?limit=1000").text )
 | |
| 
 | |
|     for hash in findAll(req, "hash"):
 | |
|         ret.append({"types": ["sha256"], "values": [hash]})
 | |
| 
 | |
|     req = json.loads(requests.get("https://otx.alienvault.com/otxapi/indicator/domain/whois/" + domain).text)
 | |
| 
 | |
|     for domain in findAll(req, "domain"):
 | |
|         ret.append({"types": ["hostname"], "values": [domain]})
 | |
| 
 | |
|     for email in findAll(req, "value"):
 | |
|         if valid_email(email):
 | |
|             ret.append({"types": ["email"], "values": [domain]})
 | |
| 
 | |
|     for domain in findAll(req, "hostname"):
 | |
|         if "." in domain and not isBlacklisted(domain):
 | |
|             ret.append({"types": ["hostname"], "values": [domain]})
 | |
| 
 | |
|     req = json.loads(requests.get("https://otx.alienvault.com/otxapi/indicator/hostname/passive_dns/" + domain).text)
 | |
|     for ip in findAll(req, "address"):
 | |
|         if valid_ip(ip):
 | |
|             ret.append({"types": ["ip-dst"], "values": [ip]})
 | |
| 
 | |
|     return ret
 | |
| 
 | |
| def introspection():
 | |
|     return mispattributes
 | |
| 
 | |
| 
 | |
| def version():
 | |
|     moduleinfo['config'] = moduleconfig
 | |
|     return moduleinfo
 |