Merge branch 'main' of github.com:MISP/misp-modules

pull/613/head
Christian Studer 2023-03-27 17:48:30 +02:00
commit 78b06ea51b
8 changed files with 331 additions and 225 deletions

View File

@ -44,7 +44,7 @@ For more information: [Extending MISP with Python modules](https://www.misp-proj
* [GeoIP](misp_modules/modules/expansion/geoip_country.py) - a hover and expansion module to get GeoIP information from geolite/maxmind.
* [GeoIP_City](misp_modules/modules/expansion/geoip_city.py) - a hover and expansion module to get GeoIP City information from geolite/maxmind.
* [GeoIP_ASN](misp_modules/modules/expansion/geoip_asn.py) - a hover and expansion module to get GeoIP ASN information from geolite/maxmind.
* [Greynoise](misp_modules/modules/expansion/greynoise.py) - a hover to get information from greynoise.
* [GreyNoise](misp_modules/modules/expansion/greynoise.py) - a hover and expansion module to get IP and CVE information from GreyNoise.
* [hashdd](misp_modules/modules/expansion/hashdd.py) - a hover module to check file hashes against [hashdd.com](http://www.hashdd.com) including NSLR dataset.
* [hibp](misp_modules/modules/expansion/hibp.py) - a hover module to lookup against Have I Been Pwned?
* [html_to_markdown](misp_modules/modules/expansion/html_to_markdown.py) - Simple HTML to markdown converter

View File

@ -47,6 +47,7 @@ filelock==3.8.0 ; python_version >= '3.7'
frozenlist==1.3.1 ; python_version >= '3.7'
future==0.18.2 ; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'
geoip2==4.6.0
greynoise==2.0.0
h11==0.12.0 ; python_version >= '3.6'
httpcore==0.15.0 ; python_version >= '3.7'
httplib2==0.20.4 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

View File

@ -0,0 +1,15 @@
{
"description": "Hover module to lookup an IP in CrowdSec's CTI",
"logo": "crowdsec.png",
"requirements": [
"A CrowdSec CTI API key. Get yours by following https://docs.crowdsec.net/docs/cti_api/getting_started/#getting-an-api-key"
],
"input": "An IP address.",
"output": "IP Lookup information from CrowdSec CTI API",
"references": [
"https://www.crowdsec.net/",
"https://docs.crowdsec.net/docs/cti_api/getting_started",
"https://app.crowdsec.net/"
],
"features": "This module enables IP lookup from CrowdSec CTI API. It provides information about the IP, such as what kind of attacks it has been participant of as seen by CrowdSec's network. It also includes enrichment by CrowdSec like background noise score, aggressivity over time etc."
}

View File

@ -1,254 +1,333 @@
import ipaddress
import json
import logging
import requests
from pymisp import MISPEvent, MISPObject
try:
from greynoise import GreyNoise
except ImportError:
print("greynoise module not installed.")
from pymisp import MISPAttribute, MISPEvent, MISPObject
from . import check_input_attribute, standard_error_message
logger = logging.getLogger("greynoise")
logger.setLevel(logging.INFO)
misperrors = {"error": "Error"}
mispattributes = {"input": ["ip-dst", "ip-src", "vulnerability"], "output": ["text"]}
mispattributes = {"input": ["ip-src", "ip-dst", "vulnerability"], "format": "misp_standard"}
moduleinfo = {
"version": "1.1",
"version": "1.2",
"author": "Brad Chiappetta <brad@greynoise.io>",
"description": "Module to access GreyNoise.io API.",
"module-type": ["hover"],
"description": "Used to query IP and CVE intel from GreyNoise",
"module-type": ["expansion", "hover"],
}
moduleconfig = ["api_key", "api_type"]
codes_mapping = {
"0x00": "The IP has never been observed scanning the Internet",
"0x01": "The IP has been observed by the GreyNoise sensor network",
"0x02": "The IP has been observed scanning the GreyNoise sensor network, "
"but has not completed a full connection, meaning this can be spoofed",
"0x03": "The IP is adjacent to another host that has been directly observed by the GreyNoise sensor network",
"0x04": "Reserved",
"0x05": "This IP is commonly spoofed in Internet-scan activity",
"0x06": "This IP has been observed as noise, but this host belongs to a cloud provider where IPs can be "
"cycled frequently",
"0x07": "This IP is invalid",
"0x08": "This IP was classified as noise, but has not been observed engaging in Internet-wide scans or "
"attacks in over 90 days",
"0x09": "IP was found in RIOT",
"0x10": "IP has been observed by the GreyNoise sensor network and is in RIOT",
}
vulnerability_mapping = {
"id": ("vulnerability", "CVE #"),
"details": ("text", "Details"),
"count": ("text", "Total Scanner Count"),
}
enterprise_context_basic_mapping = {"ip": ("text", "IP Address"), "code_message": ("text", "Code Message")}
enterprise_context_advanced_mapping = {
"noise": ("text", "Is Internet Background Noise"),
"link": ("link", "Visualizer Link"),
"classification": ("text", "Classification"),
"actor": ("text", "Actor"),
"tags": ("text", "Tags"),
"cve": ("text", "CVEs"),
"first_seen": ("text", "First Seen Scanning"),
"last_seen": ("text", "Last Seen Scanning"),
"vpn": ("text", "Known VPN Service"),
"vpn_service": ("text", "VPN Service Name"),
"bot": ("text", "Known BOT"),
}
enterprise_context_advanced_metadata_mapping = {
"asn": ("text", "ASN"),
"rdns": ("text", "rDNS"),
"category": ("text", "Category"),
"tor": ("text", "Known Tor Exit Node"),
"region": ("text", "Region"),
"city": ("text", "City"),
"country": ("text", "Country"),
"country_code": ("text", "Country Code"),
"organization": ("text", "Organization"),
}
enterprise_riot_mapping = {
"riot": ("text", "Is Common Business Service"),
"link": ("link", "Visualizer Link"),
"category": ("text", "RIOT Category"),
"name": ("text", "Provider Name"),
"trust_level": ("text", "RIOT Trust Level"),
"last_updated": ("text", "Last Updated"),
}
community_found_mapping = {
"ip": ("text", "IP Address"),
"noise": ("text", "Is Internet Background Noise"),
"riot": ("text", "Is Common Business Service"),
"classification": ("text", "Classification"),
"last_seen": ("text", "Last Seen"),
"name": ("text", "Name"),
"link": ("link", "Visualizer Link"),
}
community_not_found_mapping = {
"ip": ("text", "IP Address"),
"noise": ("text", "Is Internet Background Noise"),
"riot": ("text", "Is Common Business Service"),
"message": ("text", "Message"),
}
misp_event = MISPEvent()
def handler(q=False): # noqa: C901
if q is False:
return False
request = json.loads(q)
if not request.get("config") or not request["config"].get("api_key"):
return {"error": "Missing Greynoise API key."}
class GreyNoiseParser:
def __init__(self, attribute):
self.misp_event = MISPEvent()
self.attribute = MISPAttribute()
self.attribute.from_dict(**attribute)
self.misp_event.add_attribute(**self.attribute)
self.ip_address_enrich_mapping = {
"noise": {"type": "boolean", "object_relation": "noise"},
"riot": {"type": "boolean", "object_relation": "riot"},
"classification": {"type": "text", "object_relation": "classification"},
"actor": {"type": "text", "object_relation": "actor"},
"trust_level": {"type": "text", "object_relation": "trust-level"},
"name": {"type": "text", "object_relation": "provider"},
"first_seen": {"type": "datetime", "object_relation": "first-seen"},
"last_seen": {"type": "datetime", "object_relation": "last-seen"},
"link": {"type": "url", "object_relation": "link"},
"last_updated": {"type": "datetime", "object_relation": "last-seen"},
}
self.ip_address_hover_mapping = {
"noise": {"type": "boolean", "object_relation": "noise"},
"riot": {"type": "boolean", "object_relation": "riot"},
"classification": {"type": "text", "object_relation": "classification"},
"actor": {"type": "text", "object_relation": "actor"},
"tags": {"type": "text", "object_relation": "tags"},
"cve": {"type": "text", "object_relation": "cve"},
"vpn": {"type": "text", "object_relation": "vpn"},
"vpn_service": {"type": "text", "object_relation": "vpn_service"},
"bot": {"type": "text", "object_relation": "bot"},
"first_seen": {"type": "datetime", "object_relation": "first-seen"},
"last_seen": {"type": "datetime", "object_relation": "last-seen"},
"spoofable": {"type": "datetime", "object_relation": "spoofable"},
"link": {"type": "url", "object_relation": "link"},
"category": {"type": "text", "object_relation": "category"},
"name": {"type": "text", "object_relation": "provider"},
"trust_level": {"type": "text", "object_relation": "trust-level"},
"last_updated": {"type": "datetime", "object_relation": "last_updated"},
}
self.ip_address_metadata_mapping = {
"tor": {"type": "text", "object_relation": "tor"},
"asn": {"type": "AS", "object_relation": "asn"},
"city": {"type": "text", "object_relation": "city"},
"country_code": {"type": "text", "object_relation": "country-code"},
"country": {"type": "text", "object_relation": "country"},
"organization": {"type": "text", "object_relation": "organization"},
"destination_country_codes": {"type": "text", "object_relation": "destination-country-codes"},
"destination_countries": {"type": "text", "object_relation": "destination-countries"},
"category": {"type": "text", "object_relation": "category"},
"rdns": {"type": "text", "object_relation": "rdns"},
}
self.vulnerability_mapping = {
"id": {"type": "text", "object_relation": "id"},
"details": {"type": "text", "object_relation": "details"},
"count": {"type": "text", "object_relation": "total-count"},
"benign": {"type": "text", "object_relation": "benign-count"},
"malicious": {"type": "text", "object_relation": "malicious-count"},
"unknown": {"type": "text", "object_relation": "unknown-count"},
}
headers = {
"Accept": "application/json",
"key": request["config"]["api_key"],
"User-Agent": "greynoise-misp-module-{}".format(moduleinfo["version"]),
}
if not (request.get("vulnerability") or request.get("ip-dst") or request.get("ip-src")):
misperrors["error"] = "Vulnerability id missing"
return misperrors
ip = ""
vulnerability = ""
if request.get("ip-dst"):
ip = request.get("ip-dst")
elif request.get("ip-src"):
ip = request.get("ip-src")
else:
vulnerability = request.get("vulnerability")
if ip:
if request["config"]["api_type"] and request["config"]["api_type"] == "enterprise":
greynoise_api_url = "https://api.greynoise.io/v2/noise/quick/"
else:
greynoise_api_url = "https://api.greynoise.io/v3/community/"
response = requests.get(f"{greynoise_api_url}{ip}", headers=headers) # Real request for IP Query
if response.status_code == 200:
if request["config"]["api_type"] == "enterprise":
response = response.json()
enterprise_context_object = MISPObject("greynoise-ip-context")
for feature in ("ip", "code_message"):
if feature == "code_message":
value = codes_mapping[response.get("code")]
else:
value = response.get(feature)
if value:
attribute_type, relation = enterprise_context_basic_mapping[feature]
enterprise_context_object.add_attribute(relation, **{"type": attribute_type, "value": value})
if response["noise"]:
greynoise_api_url = "https://api.greynoise.io/v2/noise/context/"
context_response = requests.get(f"{greynoise_api_url}{ip}", headers=headers)
context_response = context_response.json()
context_response["link"] = "https://www.greynoise.io/viz/ip/" + ip
if "tags" in context_response:
context_response["tags"] = ",".join(context_response["tags"])
if "cve" in context_response:
context_response["cve"] = ",".join(context_response["cve"])
for feature in enterprise_context_advanced_mapping.keys():
value = context_response.get(feature)
if value:
attribute_type, relation = enterprise_context_advanced_mapping[feature]
enterprise_context_object.add_attribute(
relation, **{"type": attribute_type, "value": value}
)
for feature in enterprise_context_advanced_metadata_mapping.keys():
value = context_response["metadata"].get(feature)
if value:
attribute_type, relation = enterprise_context_advanced_metadata_mapping[feature]
enterprise_context_object.add_attribute(
relation, **{"type": attribute_type, "value": value}
)
if response["riot"]:
greynoise_api_url = "https://api.greynoise.io/v2/riot/"
riot_response = requests.get(f"{greynoise_api_url}{ip}", headers=headers)
riot_response = riot_response.json()
riot_response["link"] = "https://www.greynoise.io/viz/riot/" + ip
for feature in enterprise_riot_mapping.keys():
value = riot_response.get(feature)
if value:
attribute_type, relation = enterprise_riot_mapping[feature]
enterprise_context_object.add_attribute(
relation, **{"type": attribute_type, "value": value}
)
misp_event.add_object(enterprise_context_object)
event = json.loads(misp_event.to_json())
results = {key: event[key] for key in ("Attribute", "Object") if (key in event and event[key])}
return {"results": results}
def query_greynoise_ip_hover(self, api_key, api_type):
if api_type == "enterprise":
logger.info(f"Starting hover enrichment for: {self.attribute.value} via GreyNoise ENT API")
integration_name = "greynoise-misp-module-{}".format(moduleinfo["version"])
session = GreyNoise(api_key=api_key, integration_name=integration_name)
quick_response = session.quick(self.attribute.value)
if len(quick_response) != 1:
misperrors["error"] = "Quick IP lookup returned unexpected response"
return misperrors
else:
response = response.json()
community_context_object = MISPObject("greynoise-community-ip-context")
for feature in community_found_mapping.keys():
value = response.get(feature)
if value:
attribute_type, relation = community_found_mapping[feature]
community_context_object.add_attribute(relation, **{"type": attribute_type, "value": value})
misp_event.add_object(community_context_object)
event = json.loads(misp_event.to_json())
results = {key: event[key] for key in ("Attribute", "Object") if (key in event and event[key])}
return {"results": results}
if response.status_code == 404 and request["config"]["api_type"] != "enterprise":
response = response.json()
community_context_object = MISPObject("greynoise-community-ip-context")
for feature in community_not_found_mapping.keys():
value = response.get(feature)
if value:
attribute_type, relation = community_not_found_mapping[feature]
community_context_object.add_attribute(relation, **{"type": attribute_type, "value": value})
misp_event.add_object(community_context_object)
event = json.loads(misp_event.to_json())
results = {key: event[key] for key in ("Attribute", "Object") if (key in event and event[key])}
return {"results": results}
quick_response = quick_response[0]
context_response = session.ip(self.attribute.value)
riot_response = session.riot(self.attribute.value)
if vulnerability:
if request["config"]["api_type"] and request["config"]["api_type"] == "enterprise":
greynoise_api_url = "https://api.greynoise.io/v2/experimental/gnql/stats"
querystring = {"query": f"last_seen:1w cve:{vulnerability}"}
if riot_response and "trust_level" in riot_response:
if riot_response["trust_level"] == "1":
riot_response["trust_level"] = "1 - Reasonably Ignore"
if riot_response["trust_level"] == "2":
riot_response["trust_level"] = "2 - Commonly Seen"
if context_response and riot_response:
response = context_response.copy()
response.update(riot_response)
response.update(quick_response)
elif context_response:
response = context_response.copy()
response.update(quick_response)
elif riot_response:
response = riot_response.copy()
response.update(quick_response)
response["link"] = "https://viz.greynoise.io/ip/" + self.attribute.value
ip_address_attributes = []
for feature, mapping in self.ip_address_hover_mapping.items():
logger.debug(f"Checking feature {feature}")
if response.get(feature):
if feature in ["cve", "tags"]:
response[feature] = ", ".join(response[feature])
if feature == "vpn_service" and response[feature] == "N/A":
continue
if feature == "actor" and response[feature] == "unknown":
continue
attribute = {"value": response[feature]}
logger.debug(f"Adding Feature: {feature}, Attribute: {attribute}")
attribute.update(mapping)
ip_address_attributes.append(attribute)
if "metadata" in context_response:
for feature, mapping in self.ip_address_metadata_mapping.items():
logger.debug(f"Checking metadata feature {feature}")
if response["metadata"].get(feature):
if feature in ["destination_countries", "destination_country_codes"]:
response["metadata"][feature] = ", ".join(response["metadata"][feature])
attribute = {"value": response["metadata"][feature]}
logger.debug(f"Adding Feature: {feature}, Attribute: {attribute}")
attribute.update(mapping)
ip_address_attributes.append(attribute)
if ip_address_attributes:
logger.debug("creating greynoise ip object")
gn_ip_object = MISPObject("greynoise-ip-details")
for attribute in ip_address_attributes:
logger.debug(f"adding attribute {attribute}")
gn_ip_object.add_attribute(**attribute)
logger.debug(f"attribute id: {self.attribute.uuid}")
gn_ip_object.add_reference(self.attribute.uuid, "describes")
self.misp_event.add_object(gn_ip_object)
else:
logger.info(f"Starting hover enrichment for: {self.attribute.value} via GreyNoise Community API")
integration_name = "greynoise-community-misp-module-{}".format(moduleinfo["version"])
session = GreyNoise(api_key=api_key, integration_name=integration_name, offering="community")
community_response = session.ip(self.attribute.value)
if "noise" in community_response and community_response["noise"]:
community_response["actor"] = community_response["name"]
community_response.pop("name")
ip_address_attributes = []
for feature, mapping in self.ip_address_hover_mapping.items():
if community_response.get(feature):
if feature == "actor" and community_response[feature] == "unknown":
continue
attribute = {"value": community_response[feature]}
attribute.update(mapping)
ip_address_attributes.append(attribute)
if ip_address_attributes:
ip_address_object = MISPObject("greynoise-ip-details")
for attribute in ip_address_attributes:
ip_address_object.add_attribute(**attribute)
ip_address_object.add_reference(self.attribute.uuid, "describes")
self.misp_event.add_object(ip_address_object)
def query_greynoise_ip_expansion(self, api_key, api_type):
if api_type == "enterprise":
logger.info(f"Starting expansion enrichment for: {self.attribute.value} via GreyNoise ENT API")
integration_name = "greynoise-misp-module-{}".format(moduleinfo["version"])
session = GreyNoise(api_key=api_key, integration_name=integration_name)
quick_response = session.quick(self.attribute.value)
if len(quick_response) != 1:
misperrors["error"] = "Quick IP lookup returned unexpected response"
return misperrors
else:
quick_response = quick_response[0]
context_response = session.ip(self.attribute.value)
riot_response = session.riot(self.attribute.value)
if riot_response and "trust_level" in riot_response:
if riot_response["trust_level"] == "1":
riot_response["trust_level"] = "1 - Reasonably Ignore"
if riot_response["trust_level"] == "2":
riot_response["trust_level"] = "2 - Commonly Seen"
if context_response and riot_response:
response = context_response.copy()
response.update(riot_response)
response.update(quick_response)
elif context_response:
response = context_response.copy()
response.update(quick_response)
elif riot_response:
response = riot_response.copy()
response.update(quick_response)
response["link"] = "https://viz.greynoise.io/ip/" + self.attribute.value
ip_address_attributes = []
for feature, mapping in self.ip_address_enrich_mapping.items():
logger.debug(f"Checking feature {feature}")
if response.get(feature):
if feature == "actor" and response[feature] == "unknown":
continue
attribute = {"value": response[feature]}
logger.debug(f"Adding Feature: {feature}, Attribute: {attribute}")
attribute.update(mapping)
ip_address_attributes.append(attribute)
if ip_address_attributes:
logger.debug("creating greynoise ip object")
gn_ip_object = MISPObject("greynoise-ip")
for attribute in ip_address_attributes:
logger.debug(f"adding attribute {attribute}")
gn_ip_object.add_attribute(**attribute)
logger.debug(f"attribute id: {self.attribute.uuid}")
gn_ip_object.add_reference(self.attribute.uuid, "describes")
self.misp_event.add_object(gn_ip_object)
else:
logger.info(f"Starting expansion enrichment for: {self.attribute.value} via GreyNoise Community API")
integration_name = "greynoise-community-misp-module-{}".format(moduleinfo["version"])
session = GreyNoise(api_key=api_key, integration_name=integration_name, offering="community")
community_response = session.ip(self.attribute.value)
if "noise" in community_response and community_response["noise"]:
community_response["actor"] = community_response["name"]
community_response.pop("name")
ip_address_attributes = []
for feature, mapping in self.ip_address_enrich_mapping.items():
if community_response.get(feature):
if feature == "actor" and community_response[feature] == "unknown":
continue
attribute = {"value": community_response[feature]}
attribute.update(mapping)
ip_address_attributes.append(attribute)
if ip_address_attributes:
ip_address_object = MISPObject("greynoise-ip")
for attribute in ip_address_attributes:
ip_address_object.add_attribute(**attribute)
ip_address_object.add_reference(self.attribute.uuid, "describes")
self.misp_event.add_object(ip_address_object)
def query_greynoise_vulnerability(self, api_key, api_type):
if api_type == "enterprise":
logger.info(f"Starting expansion enrichment for: {self.attribute.value} via GreyNoise ENT API")
integration_name = "greynoise-misp-module-{}".format(moduleinfo["version"])
session = GreyNoise(api_key=api_key, integration_name=integration_name)
querystring = f"last_seen:1w cve:{self.attribute.value}"
else:
misperrors["error"] = "Vulnerability Not Supported with Community API Key"
return misperrors
response = requests.get(f"{greynoise_api_url}", headers=headers, params=querystring) # Real request
response = session.stats(querystring)
if response.status_code == 200:
response = response.json()
vulnerability_object = MISPObject("greynoise-vuln-info")
if "stats" in response:
response["details"] = (
"The IP count below reflects the number of IPs seen "
"by GreyNoise in the last 7 days scanning for this CVE."
)
response["id"] = vulnerability
for feature in ("id", "details", "count"):
value = response.get(feature)
if value:
attribute_type, relation = vulnerability_mapping[feature]
vulnerability_object.add_attribute(relation, **{"type": attribute_type, "value": value})
response["id"] = self.attribute.value
classifications = response["stats"].get("classifications")
for item in classifications:
if item["classification"] == "benign":
value = item["count"]
attribute_type, relation = ("text", "Benign Scanner Count")
vulnerability_object.add_attribute(relation, **{"type": attribute_type, "value": value})
response["benign"] = value
if item["classification"] == "unknown":
value = item["count"]
attribute_type, relation = ("text", "Unknown Scanner Count")
vulnerability_object.add_attribute(relation, **{"type": attribute_type, "value": value})
response["unknown"] = value
if item["classification"] == "malicious":
value = item["count"]
attribute_type, relation = ("text", "Malicious Scanner Count")
vulnerability_object.add_attribute(relation, **{"type": attribute_type, "value": value})
misp_event.add_object(vulnerability_object)
event = json.loads(misp_event.to_json())
results = {key: event[key] for key in ("Attribute", "Object") if (key in event and event[key])}
return {"results": results}
response["malicious"] = value
vulnerability_attributes = []
for feature, mapping in self.vulnerability_mapping.items():
if response.get(feature):
attribute = {"value": response[feature]}
attribute.update(mapping)
vulnerability_attributes.append(attribute)
if vulnerability_attributes:
vulnerability_object = MISPObject("greynoise-vuln-info")
for attribute in vulnerability_attributes:
vulnerability_object.add_attribute(**attribute)
vulnerability_object.add_reference(self.attribute.uuid, "describes")
self.misp_event.add_object(vulnerability_object)
# There is an error
errors = {
400: "Bad request.",
404: "IP not observed scanning the internet or contained in RIOT data set.",
401: "Unauthorized. Please check your API key.",
429: "Too many requests. You've hit the rate-limit.",
}
try:
misperrors["error"] = errors[response.status_code]
except KeyError:
misperrors["error"] = f"GreyNoise API not accessible (HTTP {response.status_code})"
return misperrors
def get_result(self):
event = json.loads(self.misp_event.to_json())
results = {key: event[key] for key in ("Attribute", "Object") if (key in event and event[key])}
return {"results": results}
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get("config", {}).get("api_key"):
return {"error": "GreyNoise API Key required, but missing"}
if not request.get("config", {}).get("api_type"):
return {"error": "GreyNoise API type of enterprise or community required, but missing"}
if not request.get("attribute") or not check_input_attribute(request["attribute"]):
return {"error": f"{standard_error_message}, which should contain at least a type, a value and an uuid."}
attribute = request["attribute"]
if attribute["type"] not in mispattributes["input"]:
return {"error": "Unsupported attribute type."}
greynoise_parser = GreyNoiseParser(attribute)
if attribute["type"] in ["ip-dst", "ip-src"]:
try:
ipaddress.IPv4Address(attribute["value"])
if "persistent" in request:
greynoise_parser.query_greynoise_ip_hover(request["config"]["api_key"], request["config"]["api_type"])
else:
greynoise_parser.query_greynoise_ip_expansion(request["config"]["api_key"], request["config"]["api_type"])
except ValueError:
return {"error": "Not a valid IPv4 address"}
if attribute["type"] == "vulnerability":
greynoise_parser.query_greynoise_vulnerability(request["config"]["api_key"], request["config"]["api_type"])
return greynoise_parser.get_result()
def introspection():

View File

@ -8,7 +8,7 @@ import json
misperrors = {"error": "Error"}
types_to_use = ['sha1', 'md5', 'domain', 'ip', 'url']
types_to_use = ['sha256', 'sha1', 'md5', 'domain', 'ip', 'url']
userConfig = {
@ -20,11 +20,17 @@ inputSource = ['event']
outputFileExtension = 'kql'
responseType = 'application/txt'
moduleinfo = {'version': '1.0', 'author': 'Julien Bachmann, Hacknowledge',
moduleinfo = {'version': '1.1', 'author': 'Julien Bachmann, Hacknowledge, Maik Wuerth',
'description': 'Defender for Endpoint KQL hunting query export module',
'module-type': ['export']}
def handle_sha256(value, period):
query = f"""find in (DeviceAlertEvents, DeviceFileEvents, DeviceImageLoadEvents, DeviceProcessEvents)
where SHA256 == '{value}' or InitiatingProcessSHA1 == '{value}'"""
return query.replace('\n', ' ')
def handle_sha1(value, period):
query = f"""find in (DeviceAlertEvents, DeviceFileEvents, DeviceImageLoadEvents, DeviceProcessEvents)
where SHA1 == '{value}' or InitiatingProcessSHA1 == '{value}'"""
@ -56,6 +62,7 @@ def handle_url(value, period):
handlers = {
'sha256': handle_sha256,
'sha1': handle_sha1,
'md5': handle_md5,
'domain': handle_domain,
@ -75,6 +82,10 @@ def handler(q=False):
for attribute in event["Attribute"]:
if attribute['type'] in types_to_use:
output = output + handlers[attribute['type']](attribute['value'], config['Period']) + '\n'
for obj in event["Object"]:
for attribute in obj["Attribute"]:
if attribute['type'] in types_to_use:
output = output + handlers[attribute['type']](attribute['value'], config['Period']) + '\n'
r = {"response": [], "data": str(base64.b64encode(bytes(output, 'utf-8')), 'utf-8')}
return r

View File

@ -16,7 +16,7 @@ edit_uri: ""
use_directory_urls: true
# Copyright
copyright: "Copyright &copy; 2019-2022 MISP Project"
copyright: "Copyright &copy; 2019-2023 MISP Project"
# Options
extra:

View File

@ -297,7 +297,7 @@ class TestExpansions(unittest.TestCase):
)
else:
response = self.misp_modules_post(query)
self.assertEqual(self.get_errors(response), 'Missing Greynoise API key.')
self.assertEqual(self.get_errors(response), 'GreyNoise API Key required, but missing')
@unittest.skip("Service doesn't work")
def test_ipasn(self):