mirror of https://github.com/MISP/misp-modules
340 lines
17 KiB
Python
340 lines
17 KiB
Python
import ipaddress
|
|
import json
|
|
import logging
|
|
|
|
|
|
try:
|
|
from greynoise import GreyNoise
|
|
except ImportError:
|
|
print("greynoise module not installed.")
|
|
from pymisp import MISPAttribute, MISPEvent, MISPObject
|
|
|
|
from . import check_input_attribute, standard_error_message
|
|
|
|
logger = logging.getLogger("greynoise")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
misperrors = {"error": "Error"}
|
|
mispattributes = {"input": ["ip-src", "ip-dst", "vulnerability"], "format": "misp_standard"}
|
|
moduleinfo = {
|
|
"version": "1.2",
|
|
"author": "Brad Chiappetta <brad@greynoise.io>",
|
|
"description": "Used to query IP and CVE intel from GreyNoise",
|
|
"module-type": ["expansion", "hover"],
|
|
}
|
|
moduleconfig = ["api_key", "api_type"]
|
|
|
|
|
|
class GreyNoiseParser:
|
|
def __init__(self, attribute):
|
|
self.misp_event = MISPEvent()
|
|
self.attribute = MISPAttribute()
|
|
self.attribute.from_dict(**attribute)
|
|
self.misp_event.add_attribute(**self.attribute)
|
|
self.ip_address_enrich_mapping = {
|
|
"noise": {"type": "boolean", "object_relation": "noise"},
|
|
"riot": {"type": "boolean", "object_relation": "riot"},
|
|
"classification": {"type": "text", "object_relation": "classification"},
|
|
"actor": {"type": "text", "object_relation": "actor"},
|
|
"trust_level": {"type": "text", "object_relation": "trust-level"},
|
|
"name": {"type": "text", "object_relation": "provider"},
|
|
"first_seen": {"type": "datetime", "object_relation": "first-seen"},
|
|
"last_seen": {"type": "datetime", "object_relation": "last-seen"},
|
|
"link": {"type": "url", "object_relation": "link"},
|
|
"last_updated": {"type": "datetime", "object_relation": "last-seen"},
|
|
}
|
|
self.ip_address_hover_mapping = {
|
|
"noise": {"type": "boolean", "object_relation": "noise"},
|
|
"riot": {"type": "boolean", "object_relation": "riot"},
|
|
"classification": {"type": "text", "object_relation": "classification"},
|
|
"actor": {"type": "text", "object_relation": "actor"},
|
|
"tags": {"type": "text", "object_relation": "tags"},
|
|
"cve": {"type": "text", "object_relation": "cve"},
|
|
"vpn": {"type": "text", "object_relation": "vpn"},
|
|
"vpn_service": {"type": "text", "object_relation": "vpn_service"},
|
|
"bot": {"type": "text", "object_relation": "bot"},
|
|
"first_seen": {"type": "datetime", "object_relation": "first-seen"},
|
|
"last_seen": {"type": "datetime", "object_relation": "last-seen"},
|
|
"spoofable": {"type": "datetime", "object_relation": "spoofable"},
|
|
"link": {"type": "url", "object_relation": "link"},
|
|
"category": {"type": "text", "object_relation": "category"},
|
|
"name": {"type": "text", "object_relation": "provider"},
|
|
"trust_level": {"type": "text", "object_relation": "trust-level"},
|
|
"last_updated": {"type": "datetime", "object_relation": "last_updated"},
|
|
}
|
|
self.ip_address_metadata_mapping = {
|
|
"tor": {"type": "text", "object_relation": "tor"},
|
|
"asn": {"type": "AS", "object_relation": "asn"},
|
|
"city": {"type": "text", "object_relation": "city"},
|
|
"country_code": {"type": "text", "object_relation": "country-code"},
|
|
"country": {"type": "text", "object_relation": "country"},
|
|
"organization": {"type": "text", "object_relation": "organization"},
|
|
"destination_country_codes": {"type": "text", "object_relation": "destination-country-codes"},
|
|
"destination_countries": {"type": "text", "object_relation": "destination-countries"},
|
|
"category": {"type": "text", "object_relation": "category"},
|
|
"rdns": {"type": "text", "object_relation": "rdns"},
|
|
}
|
|
self.vulnerability_mapping = {
|
|
"id": {"type": "text", "object_relation": "id"},
|
|
"details": {"type": "text", "object_relation": "details"},
|
|
"count": {"type": "text", "object_relation": "total-count"},
|
|
"benign": {"type": "text", "object_relation": "benign-count"},
|
|
"malicious": {"type": "text", "object_relation": "malicious-count"},
|
|
"unknown": {"type": "text", "object_relation": "unknown-count"},
|
|
}
|
|
|
|
def query_greynoise_ip_hover(self, api_key, api_type):
|
|
if api_type == "enterprise":
|
|
logger.info(f"Starting hover enrichment for: {self.attribute.value} via GreyNoise ENT API")
|
|
integration_name = "greynoise-misp-module-{}".format(moduleinfo["version"])
|
|
session = GreyNoise(api_key=api_key, integration_name=integration_name)
|
|
quick_response = session.quick(self.attribute.value)
|
|
if len(quick_response) != 1:
|
|
misperrors["error"] = "Quick IP lookup returned unexpected response"
|
|
return misperrors
|
|
else:
|
|
quick_response = quick_response[0]
|
|
context_response = session.ip(self.attribute.value)
|
|
riot_response = session.riot(self.attribute.value)
|
|
|
|
if riot_response and "trust_level" in riot_response:
|
|
if riot_response["trust_level"] == "1":
|
|
riot_response["trust_level"] = "1 - Reasonably Ignore"
|
|
if riot_response["trust_level"] == "2":
|
|
riot_response["trust_level"] = "2 - Commonly Seen"
|
|
|
|
if context_response and riot_response:
|
|
response = context_response.copy()
|
|
response.update(riot_response)
|
|
response.update(quick_response)
|
|
elif context_response:
|
|
response = context_response.copy()
|
|
response.update(quick_response)
|
|
elif riot_response:
|
|
response = riot_response.copy()
|
|
response.update(quick_response)
|
|
|
|
response["link"] = "https://viz.greynoise.io/ip/" + self.attribute.value
|
|
|
|
ip_address_attributes = []
|
|
for feature, mapping in self.ip_address_hover_mapping.items():
|
|
logger.debug(f"Checking feature {feature}")
|
|
if response.get(feature):
|
|
if feature in ["cve", "tags"]:
|
|
response[feature] = ", ".join(response[feature])
|
|
if feature == "vpn_service" and response[feature] == "N/A":
|
|
continue
|
|
if feature == "actor" and response[feature] == "unknown":
|
|
continue
|
|
attribute = {"value": response[feature]}
|
|
logger.debug(f"Adding Feature: {feature}, Attribute: {attribute}")
|
|
attribute.update(mapping)
|
|
ip_address_attributes.append(attribute)
|
|
if "metadata" in context_response:
|
|
for feature, mapping in self.ip_address_metadata_mapping.items():
|
|
logger.debug(f"Checking metadata feature {feature}")
|
|
if response["metadata"].get(feature):
|
|
if feature in ["destination_countries", "destination_country_codes"]:
|
|
response["metadata"][feature] = ", ".join(response["metadata"][feature])
|
|
attribute = {"value": response["metadata"][feature]}
|
|
logger.debug(f"Adding Feature: {feature}, Attribute: {attribute}")
|
|
attribute.update(mapping)
|
|
ip_address_attributes.append(attribute)
|
|
if ip_address_attributes:
|
|
logger.debug("creating greynoise ip object")
|
|
gn_ip_object = MISPObject("greynoise-ip-details")
|
|
for attribute in ip_address_attributes:
|
|
logger.debug(f"adding attribute {attribute}")
|
|
gn_ip_object.add_attribute(**attribute)
|
|
logger.debug(f"attribute id: {self.attribute.uuid}")
|
|
gn_ip_object.add_reference(self.attribute.uuid, "describes")
|
|
self.misp_event.add_object(gn_ip_object)
|
|
else:
|
|
logger.info(f"Starting hover enrichment for: {self.attribute.value} via GreyNoise Community API")
|
|
integration_name = "greynoise-community-misp-module-{}".format(moduleinfo["version"])
|
|
session = GreyNoise(api_key=api_key, integration_name=integration_name, offering="community")
|
|
community_response = session.ip(self.attribute.value)
|
|
|
|
if "noise" in community_response and community_response["noise"]:
|
|
community_response["actor"] = community_response["name"]
|
|
community_response.pop("name")
|
|
|
|
ip_address_attributes = []
|
|
for feature, mapping in self.ip_address_hover_mapping.items():
|
|
if community_response.get(feature):
|
|
if feature == "actor" and community_response[feature] == "unknown":
|
|
continue
|
|
attribute = {"value": community_response[feature]}
|
|
attribute.update(mapping)
|
|
ip_address_attributes.append(attribute)
|
|
if ip_address_attributes:
|
|
ip_address_object = MISPObject("greynoise-ip-details")
|
|
for attribute in ip_address_attributes:
|
|
ip_address_object.add_attribute(**attribute)
|
|
ip_address_object.add_reference(self.attribute.uuid, "describes")
|
|
self.misp_event.add_object(ip_address_object)
|
|
|
|
def query_greynoise_ip_expansion(self, api_key, api_type):
|
|
if api_type == "enterprise":
|
|
logger.info(f"Starting expansion enrichment for: {self.attribute.value} via GreyNoise ENT API")
|
|
integration_name = "greynoise-misp-module-{}".format(moduleinfo["version"])
|
|
session = GreyNoise(api_key=api_key, integration_name=integration_name)
|
|
quick_response = session.quick(self.attribute.value)
|
|
if len(quick_response) != 1:
|
|
misperrors["error"] = "Quick IP lookup returned unexpected response"
|
|
return misperrors
|
|
else:
|
|
quick_response = quick_response[0]
|
|
context_response = session.ip(self.attribute.value)
|
|
riot_response = session.riot(self.attribute.value)
|
|
|
|
if riot_response and "trust_level" in riot_response:
|
|
if riot_response["trust_level"] == "1":
|
|
riot_response["trust_level"] = "1 - Reasonably Ignore"
|
|
if riot_response["trust_level"] == "2":
|
|
riot_response["trust_level"] = "2 - Commonly Seen"
|
|
|
|
if context_response and riot_response:
|
|
response = context_response.copy()
|
|
response.update(riot_response)
|
|
response.update(quick_response)
|
|
elif context_response:
|
|
response = context_response.copy()
|
|
response.update(quick_response)
|
|
elif riot_response:
|
|
response = riot_response.copy()
|
|
response.update(quick_response)
|
|
|
|
response["link"] = "https://viz.greynoise.io/ip/" + self.attribute.value
|
|
|
|
ip_address_attributes = []
|
|
for feature, mapping in self.ip_address_enrich_mapping.items():
|
|
logger.debug(f"Checking feature {feature}")
|
|
if response.get(feature):
|
|
if feature == "actor" and response[feature] == "unknown":
|
|
continue
|
|
attribute = {"value": response[feature]}
|
|
logger.debug(f"Adding Feature: {feature}, Attribute: {attribute}")
|
|
attribute.update(mapping)
|
|
ip_address_attributes.append(attribute)
|
|
if ip_address_attributes:
|
|
logger.debug("creating greynoise ip object")
|
|
gn_ip_object = MISPObject("greynoise-ip")
|
|
for attribute in ip_address_attributes:
|
|
logger.debug(f"adding attribute {attribute}")
|
|
gn_ip_object.add_attribute(**attribute)
|
|
logger.debug(f"attribute id: {self.attribute.uuid}")
|
|
gn_ip_object.add_reference(self.attribute.uuid, "describes")
|
|
self.misp_event.add_object(gn_ip_object)
|
|
else:
|
|
logger.info(f"Starting expansion enrichment for: {self.attribute.value} via GreyNoise Community API")
|
|
integration_name = "greynoise-community-misp-module-{}".format(moduleinfo["version"])
|
|
session = GreyNoise(api_key=api_key, integration_name=integration_name, offering="community")
|
|
community_response = session.ip(self.attribute.value)
|
|
|
|
if "noise" in community_response and community_response["noise"]:
|
|
community_response["actor"] = community_response["name"]
|
|
community_response.pop("name")
|
|
|
|
ip_address_attributes = []
|
|
for feature, mapping in self.ip_address_enrich_mapping.items():
|
|
if community_response.get(feature):
|
|
if feature == "actor" and community_response[feature] == "unknown":
|
|
continue
|
|
attribute = {"value": community_response[feature]}
|
|
attribute.update(mapping)
|
|
ip_address_attributes.append(attribute)
|
|
if ip_address_attributes:
|
|
ip_address_object = MISPObject("greynoise-ip")
|
|
for attribute in ip_address_attributes:
|
|
ip_address_object.add_attribute(**attribute)
|
|
ip_address_object.add_reference(self.attribute.uuid, "describes")
|
|
self.misp_event.add_object(ip_address_object)
|
|
|
|
def query_greynoise_vulnerability(self, api_key, api_type):
|
|
if api_type == "enterprise":
|
|
logger.info(f"Starting expansion enrichment for: {self.attribute.value} via GreyNoise ENT API")
|
|
integration_name = "greynoise-misp-module-{}".format(moduleinfo["version"])
|
|
session = GreyNoise(api_key=api_key, integration_name=integration_name)
|
|
querystring = f"last_seen:1w cve:{self.attribute.value}"
|
|
else:
|
|
misperrors["error"] = "Vulnerability Not Supported with Community API Key"
|
|
return misperrors
|
|
|
|
response = session.stats(querystring)
|
|
|
|
if "stats" in response:
|
|
response["details"] = (
|
|
"The IP count below reflects the number of IPs seen "
|
|
"by GreyNoise in the last 7 days scanning for this CVE."
|
|
)
|
|
response["id"] = self.attribute.value
|
|
classifications = response["stats"].get("classifications")
|
|
for item in classifications:
|
|
if item["classification"] == "benign":
|
|
value = item["count"]
|
|
response["benign"] = value
|
|
if item["classification"] == "unknown":
|
|
value = item["count"]
|
|
response["unknown"] = value
|
|
if item["classification"] == "malicious":
|
|
value = item["count"]
|
|
response["malicious"] = value
|
|
vulnerability_attributes = []
|
|
for feature, mapping in self.vulnerability_mapping.items():
|
|
if response.get(feature):
|
|
attribute = {"value": response[feature]}
|
|
attribute.update(mapping)
|
|
vulnerability_attributes.append(attribute)
|
|
if vulnerability_attributes:
|
|
vulnerability_object = MISPObject("greynoise-vuln-info")
|
|
for attribute in vulnerability_attributes:
|
|
vulnerability_object.add_attribute(**attribute)
|
|
vulnerability_object.add_reference(self.attribute.uuid, "describes")
|
|
self.misp_event.add_object(vulnerability_object)
|
|
|
|
def get_result(self):
|
|
event = json.loads(self.misp_event.to_json())
|
|
results = {key: event[key] for key in ("Attribute", "Object") if (key in event and event[key])}
|
|
return {"results": results}
|
|
|
|
|
|
def handler(q=False):
|
|
if q is False:
|
|
return False
|
|
request = json.loads(q)
|
|
if not request.get("config", {}).get("api_key"):
|
|
return {"error": "GreyNoise API Key required, but missing"}
|
|
if not request.get("config", {}).get("api_type"):
|
|
return {"error": "GreyNoise API type of enterprise or community required, but missing"}
|
|
if not request.get("attribute") or not check_input_attribute(request["attribute"]):
|
|
return {"error": f"{standard_error_message}, which should contain at least a type, a value and an uuid."}
|
|
attribute = request["attribute"]
|
|
if attribute["type"] not in mispattributes["input"]:
|
|
return {"error": "Unsupported attribute type."}
|
|
greynoise_parser = GreyNoiseParser(attribute)
|
|
|
|
if attribute["type"] in ["ip-dst", "ip-src"]:
|
|
try:
|
|
ipaddress.IPv4Address(attribute["value"])
|
|
if "persistent" in request:
|
|
greynoise_parser.query_greynoise_ip_hover(request["config"]["api_key"], request["config"]["api_type"])
|
|
else:
|
|
greynoise_parser.query_greynoise_ip_expansion(request["config"]["api_key"], request["config"]["api_type"])
|
|
except ValueError:
|
|
return {"error": "Not a valid IPv4 address"}
|
|
|
|
if attribute["type"] == "vulnerability":
|
|
greynoise_parser.query_greynoise_vulnerability(request["config"]["api_key"], request["config"]["api_type"])
|
|
|
|
return greynoise_parser.get_result()
|
|
|
|
|
|
def introspection():
|
|
return mispattributes
|
|
|
|
|
|
def version():
|
|
moduleinfo["config"] = moduleconfig
|
|
return moduleinfo
|