misp-modules/misp_modules/modules/expansion/threatcrowd.py

161 lines
4.4 KiB
Python
Raw Normal View History

import json
import requests
import re
misperrors = {'error': 'Error'}
mispattributes = {'input': ["hostname", "domain", "ip-src", "ip-dst", "md5", "sha1", "sha256", "sha512", "whois-registrant-email"],
'output': ["domain", "ip-src", "ip-dst", "text", "md5", "sha1", "sha256", "sha512", "hostname", "whois-registrant-email"]
}
# possible module-types: 'expansion', 'hover' or both
moduleinfo = {'version': '1', 'author': 'chrisdoman',
'description': 'Get information from ThreatCrowd',
'module-type': ['expansion']}
moduleconfig = []
# Avoid adding windows update to enrichment etc.
def isBlacklisted(value):
blacklist = ['8.8.8.8', '255.255.255.255', '192.168.56.' , 'time.windows.com']
for b in blacklist:
if value in b:
return True
return False
def valid_ip(ip):
m = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", ip)
return bool(m) and all(map(lambda n: 0 <= int(n) <= 255, m.groups()))
def valid_domain(hostname):
if len(hostname) > 255:
return False
if hostname[-1] == ".":
hostname = hostname[:-1] # strip exactly one dot from the right, if present
allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
return all(allowed.match(x) for x in hostname.split("."))
def valid_email(email):
return bool(re.search(r"^[\w\.\+\-]+\@[\w]+\.[a-z]{2,3}$", email))
def handler(q=False):
if q is False:
return False
q = json.loads(q)
r = {"results": []}
if "ip-src" in q:
r["results"] += getIP(q["ip-src"])
if "ip-dst" in q:
r["results"] += getIP(q["ip-dst"])
if "domain" in q:
r["results"] += getDomain(q["domain"])
if 'hostname' in q:
r["results"] += getDomain(q['hostname'])
if 'md5' in q:
r["results"] += getHash(q['md5'])
if 'sha1' in q:
r["results"] += getHash(q['sha1'])
if 'sha256' in q:
r["results"] += getHash(q['sha256'])
if 'sha512' in q:
r["results"] += getHash(q['sha512'])
if 'whois-registrant-email' in q:
r["results"] += getEmail(q['whois-registrant-email'])
uniq = []
for res in r["results"]:
if res not in uniq:
uniq.append(res)
r["results"] = uniq
return r
def getHash(hash):
ret = []
req = json.loads(requests.get("https://www.threatcrowd.org/searchApi/v2/file/report/?resource=" + hash).text)
if "domains" in req:
domains = req["domains"]
for domain in domains:
if not isBlacklisted(domain) and valid_domain(domain):
ret.append({"types": ["hostname"], "values": [domain]})
if "ips" in req:
ips = req["ips"]
for ip in ips:
if not isBlacklisted(ip):
ret.append({"types": ["ip-dst"], "values": [ip]})
return ret
def getIP(ip):
ret = []
req = json.loads( requests.get("https://www.threatcrowd.org/searchApi/v2/ip/report/?ip=" + ip).text )
if "resolutions" in req:
for dns in req["resolutions"]:
if "domain" in dns:
if valid_domain(dns["domain"]):
ret.append({"types": ["hostname"], "values": [dns["domain"]]})
if "hashes" in req:
for hash in req["hashes"]:
ret.append({"types": ["md5"], "values": [hash]})
return ret
def getEmail(email):
ret = []
j = requests.get("https://www.threatcrowd.org/searchApi/v2/email/report/?email=" + email).text
req = json.loads(j)
if "domains" in req:
domains = req["domains"]
for domain in domains:
if not isBlacklisted(domain) and valid_domain(domain):
ret.append({"types": ["hostname"], "values": [domain]})
return ret
def getDomain(domain):
ret = []
req = json.loads( requests.get("https://www.threatcrowd.org/searchApi/v2/domain/report/?domain=" + domain).text )
if "resolutions" in req:
for dns in req["resolutions"]:
if "ip_address" in dns:
ret.append({"types": ["ip-dst"], "values": [dns["ip_address"]]})
if "emails" in req:
for email in req["emails"]:
ret.append({"types": ["whois-registrant-email"], "values": [email]})
if "hashes" in req:
for hash in req["hashes"]:
ret.append({"types": ["md5"], "values": [hash]})
return ret
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo