mirror of https://github.com/MISP/misp-modules
parent
8ed344778c
commit
c4fe78b39d
3 changed files with 316 additions and 1 deletions
@ -0,0 +1,154 @@ |
||||
import json |
||||
import requests |
||||
import re |
||||
|
||||
misperrors = {'error': 'Error'} |
||||
mispattributes = {'input': ["hostname", "domain", "ip-src", "ip-dst", "md5", "sha1", "sha256", "sha512"], |
||||
'output': ["domain", "ip-src", "ip-dst", "text", "md5", "sha1", "sha256", "sha512", "email"] |
||||
} |
||||
|
||||
# possible module-types: 'expansion', 'hover' or both |
||||
moduleinfo = {'version': '1', 'author': 'chrisdoman', |
||||
'description': 'Get information from AlienVault OTX', |
||||
'module-type': ['expansion']} |
||||
|
||||
# We're not actually using the API key yet |
||||
moduleconfig = ["apikey"] |
||||
|
||||
# Avoid adding windows update to enrichment etc. |
||||
def isBlacklisted(value): |
||||
blacklist = ['0.0.0.0', '8.8.8.8', '255.255.255.255', '192.168.56.' , 'time.windows.com'] |
||||
|
||||
for b in blacklist: |
||||
if value in b: |
||||
return False |
||||
|
||||
return True |
||||
|
||||
def valid_ip(ip): |
||||
m = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", ip) |
||||
return bool(m) and all(map(lambda n: 0 <= int(n) <= 255, m.groups())) |
||||
|
||||
def findAll(data, keys): |
||||
a = [] |
||||
if isinstance(data, dict): |
||||
for key in data.keys(): |
||||
if key == keys: |
||||
a.append(data[key]) |
||||
else: |
||||
if isinstance(data[key], (dict, list)): |
||||
a += findAll(data[key], keys) |
||||
if isinstance(data, list): |
||||
for i in data: |
||||
a += findAll(i, keys) |
||||
|
||||
return a |
||||
|
||||
def valid_email(email): |
||||
return bool(re.search(r"^[\w\.\+\-]+\@[\w]+\.[a-z]{2,3}$", email)) |
||||
|
||||
def handler(q=False): |
||||
if q is False: |
||||
return False |
||||
|
||||
q = json.loads(q) |
||||
|
||||
key = q["config"]["apikey"] |
||||
|
||||
r = {"results": []} |
||||
|
||||
if "ip-src" in q: |
||||
r["results"] += getIP(q["ip-src"], key) |
||||
if "ip-dst" in q: |
||||
r["results"] += getIP(q["ip-dst"], key) |
||||
if "domain" in q: |
||||
r["results"] += getDomain(q["domain"], key) |
||||
if 'hostname' in q: |
||||
r["results"] += getDomain(q['hostname'], key) |
||||
if 'md5' in q: |
||||
r["results"] += getHash(q['md5'], key) |
||||
if 'sha1' in q: |
||||
r["results"] += getHash(q['sha1'], key) |
||||
if 'sha256' in q: |
||||
r["results"] += getHash(q['sha256'], key) |
||||
if 'sha512' in q: |
||||
r["results"] += getHash(q['sha512'], key) |
||||
|
||||
uniq = [] |
||||
for res in r["results"]: |
||||
if res not in uniq: |
||||
uniq.append(res) |
||||
r["results"] = uniq |
||||
return r |
||||
|
||||
|
||||
def getHash(hash, key): |
||||
|
||||
ret = [] |
||||
req = json.loads(requests.get("https://otx.alienvault.com/otxapi/indicator/file/analysis/" + hash).text) |
||||
|
||||
for ip in findAll(req, "dst"): |
||||
if not isBlacklisted(ip) and valid_ip(ip): |
||||
ret.append({"types": ["ip-dst", "ip-src"], "values": [ip]}) |
||||
|
||||
for domain in findAll(req, "hostname"): |
||||
if "." in domain and not isBlacklisted(domain): |
||||
ret.append({"types": ["hostname"], "values": [domain]}) |
||||
|
||||
return ret |
||||
|
||||
|
||||
def getIP(ip, key): |
||||
ret = [] |
||||
req = json.loads( requests.get("https://otx.alienvault.com/otxapi/indicator/ip/malware/" + ip + "?limit=1000").text ) |
||||
|
||||
for hash in findAll(req, "hash"): |
||||
ret.append({"types": ["sha256"], "values": [hash]}) |
||||
|
||||
|
||||
req = json.loads( requests.get("https://otx.alienvault.com/otxapi/indicator/ip/passive_dns/" + ip).text ) |
||||
|
||||
for hostname in findAll(req, "hostname"): |
||||
if not isBlacklisted(hostname): |
||||
ret.append({"types": ["hostname"], "values": [hostname]}) |
||||
|
||||
|
||||
return ret |
||||
|
||||
|
||||
def getDomain(domain, key): |
||||
|
||||
ret = [] |
||||
|
||||
req = json.loads( requests.get("https://otx.alienvault.com/otxapi/indicator/domain/malware/" + domain + "?limit=1000").text ) |
||||
|
||||
for hash in findAll(req, "hash"): |
||||
ret.append({"types": ["sha256"], "values": [hash]}) |
||||
|
||||
req = json.loads(requests.get("https://otx.alienvault.com/otxapi/indicator/domain/whois/" + domain).text) |
||||
|
||||
for domain in findAll(req, "domain"): |
||||
ret.append({"types": ["hostname"], "values": [domain]}) |
||||
|
||||
for email in findAll(req, "value"): |
||||
if valid_email(email): |
||||
ret.append({"types": ["email"], "values": [domain]}) |
||||
|
||||
for domain in findAll(req, "hostname"): |
||||
if "." in domain and not isBlacklisted(domain): |
||||
ret.append({"types": ["hostname"], "values": [domain]}) |
||||
|
||||
req = json.loads(requests.get("https://otx.alienvault.com/otxapi/indicator/hostname/passive_dns/" + domain).text) |
||||
for ip in findAll(req, "address"): |
||||
if valid_ip(ip): |
||||
ret.append({"types": ["ip-dst"], "values": [ip]}) |
||||
|
||||
return ret |
||||
|
||||
def introspection(): |
||||
return mispattributes |
||||
|
||||
|
||||
def version(): |
||||
moduleinfo['config'] = moduleconfig |
||||
return moduleinfo |
@ -0,0 +1,160 @@ |
||||
import json |
||||
import requests |
||||
import re |
||||
|
||||
misperrors = {'error': 'Error'} |
||||
mispattributes = {'input': ["hostname", "domain", "ip-src", "ip-dst", "md5", "sha1", "sha256", "sha512", "whois-registrant-email"], |
||||
'output': ["domain", "ip-src", "ip-dst", "text", "md5", "sha1", "sha256", "sha512", "hostname", "whois-registrant-email"] |
||||
} |
||||
|
||||
# possible module-types: 'expansion', 'hover' or both |
||||
moduleinfo = {'version': '1', 'author': 'chrisdoman', |
||||
'description': 'Get information from ThreatCrowd', |
||||
'module-type': ['expansion']} |
||||
|
||||
moduleconfig = [] |
||||
|
||||
|
||||
# Avoid adding windows update to enrichment etc. |
||||
def isBlacklisted(value): |
||||
blacklist = ['8.8.8.8', '255.255.255.255', '192.168.56.' , 'time.windows.com'] |
||||
|
||||
for b in blacklist: |
||||
if value in b: |
||||
return True |
||||
|
||||
return False |
||||
|
||||
def valid_ip(ip): |
||||
m = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", ip) |
||||
return bool(m) and all(map(lambda n: 0 <= int(n) <= 255, m.groups())) |
||||
|
||||
def valid_domain(hostname): |
||||
if len(hostname) > 255: |
||||
return False |
||||
if hostname[-1] == ".": |
||||
hostname = hostname[:-1] # strip exactly one dot from the right, if present |
||||
allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE) |
||||
return all(allowed.match(x) for x in hostname.split(".")) |
||||
|
||||
def valid_email(email): |
||||
return bool(re.search(r"^[\w\.\+\-]+\@[\w]+\.[a-z]{2,3}$", email)) |
||||
|
||||
def handler(q=False): |
||||
if q is False: |
||||
return False |
||||
|
||||
q = json.loads(q) |
||||
|
||||
|
||||
r = {"results": []} |
||||
|
||||
if "ip-src" in q: |
||||
r["results"] += getIP(q["ip-src"]) |
||||
if "ip-dst" in q: |
||||
r["results"] += getIP(q["ip-dst"]) |
||||
if "domain" in q: |
||||
r["results"] += getDomain(q["domain"]) |
||||
if 'hostname' in q: |
||||
r["results"] += getDomain(q['hostname']) |
||||
if 'md5' in q: |
||||
r["results"] += getHash(q['md5']) |
||||
if 'sha1' in q: |
||||
r["results"] += getHash(q['sha1']) |
||||
if 'sha256' in q: |
||||
r["results"] += getHash(q['sha256']) |
||||
if 'sha512' in q: |
||||
r["results"] += getHash(q['sha512']) |
||||
if 'whois-registrant-email' in q: |
||||
r["results"] += getEmail(q['whois-registrant-email']) |
||||
|
||||
uniq = [] |
||||
for res in r["results"]: |
||||
if res not in uniq: |
||||
uniq.append(res) |
||||
r["results"] = uniq |
||||
return r |
||||
|
||||
|
||||
def getHash(hash): |
||||
|
||||
ret = [] |
||||
req = json.loads(requests.get("https://www.threatcrowd.org/searchApi/v2/file/report/?resource=" + hash).text) |
||||
|
||||
if "domains" in req: |
||||
domains = req["domains"] |
||||
for domain in domains: |
||||
if not isBlacklisted(domain) and valid_domain(domain): |
||||
ret.append({"types": ["hostname"], "values": [domain]}) |
||||
|
||||
if "ips" in req: |
||||
ips = req["ips"] |
||||
for ip in ips: |
||||
if not isBlacklisted(ip): |
||||
ret.append({"types": ["ip-dst"], "values": [ip]}) |
||||
|
||||
return ret |
||||
|
||||
|
||||
def getIP(ip): |
||||
ret = [] |
||||
req = json.loads( requests.get("https://www.threatcrowd.org/searchApi/v2/ip/report/?ip=" + ip).text ) |
||||
|
||||
if "resolutions" in req: |
||||
for dns in req["resolutions"]: |
||||
if "domain" in dns: |
||||
if valid_domain(dns["domain"]): |
||||
ret.append({"types": ["hostname"], "values": [dns["domain"]]}) |
||||
|
||||
if "hashes" in req: |
||||
for hash in req["hashes"]: |
||||
ret.append({"types": ["md5"], "values": [hash]}) |
||||
|
||||
|
||||
return ret |
||||
|
||||
|
||||
|
||||
def getEmail(email): |
||||
ret = [] |
||||
j = requests.get("https://www.threatcrowd.org/searchApi/v2/email/report/?email=" + email).text |
||||
req = json.loads(j) |
||||
|
||||
if "domains" in req: |
||||
domains = req["domains"] |
||||
for domain in domains: |
||||
if not isBlacklisted(domain) and valid_domain(domain): |
||||
ret.append({"types": ["hostname"], "values": [domain]}) |
||||
|
||||
return ret |
||||
|
||||
|
||||
|
||||
def getDomain(domain): |
||||
|
||||
ret = [] |
||||
req = json.loads( requests.get("https://www.threatcrowd.org/searchApi/v2/domain/report/?domain=" + domain).text ) |
||||
|
||||
if "resolutions" in req: |
||||
for dns in req["resolutions"]: |
||||
if "ip_address" in dns: |
||||
ret.append({"types": ["ip-dst"], "values": [dns["ip_address"]]}) |
||||
|
||||
if "emails" in req: |
||||
for email in req["emails"]: |
||||
ret.append({"types": ["whois-registrant-email"], "values": [email]}) |
||||
|
||||
if "hashes" in req: |
||||
for hash in req["hashes"]: |
||||
ret.append({"types": ["md5"], "values": [hash]}) |
||||
|
||||
|
||||
return ret |
||||
|
||||
def introspection(): |
||||
return mispattributes |
||||
|
||||
|
||||
def version(): |
||||
moduleinfo['config'] = moduleconfig |
||||
return moduleinfo |
Loading…
Reference in new issue