From 196939d205deb42cab5788f131399d8e03587446 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Fri, 12 May 2023 12:16:22 +0200 Subject: [PATCH] chg: [crowdsec] Updated the module to support the recently added `crowdsec-ip-context` object template --- misp_modules/modules/expansion/crowdsec.py | 124 ++++++++------------- 1 file changed, 46 insertions(+), 78 deletions(-) diff --git a/misp_modules/modules/expansion/crowdsec.py b/misp_modules/modules/expansion/crowdsec.py index 6ba9b06..d512fd4 100644 --- a/misp_modules/modules/expansion/crowdsec.py +++ b/misp_modules/modules/expansion/crowdsec.py @@ -35,7 +35,8 @@ def handler(q=False): def _handler_v2(request_data): - ip = request_data['attribute']['value'] + attribute = request_data['attribute'] + ip = attribute['value'] crowdsec_cti = requests.get( f"https://cti.api.crowdsec.net/v2/smoke/{ip}", @@ -48,93 +49,60 @@ def _handler_v2(request_data): crowdsec_cti = crowdsec_cti.json() misp_event = MISPEvent() + misp_attribute = misp_event.add_attribute(**attribute) crowdsec_context_object = MISPObject("crowdsec-ip-context") - crowdsec_context_object.add_attribute("IP Address", **{"type": "text", "value": ip}) - crowdsec_context_object.add_attribute( - "IP Range", **{"type": "text", "value": crowdsec_cti["ip_range"]} + crowdsec_context_object.from_dict( + first_seen=crowdsec_cti["history"]["first_seen"], + last_seen=crowdsec_cti["history"]["last_seen"] ) + crowdsec_context_object.add_attribute("ip", crowdsec_cti["ip"]) + crowdsec_context_object.add_attribute("ip-range", crowdsec_cti["ip_range"]) + crowdsec_context_object.add_attribute("ip-range-score", crowdsec_cti["ip_range_score"]) crowdsec_context_object.add_attribute( - "IP Range Score", **{"type": "text", "value": crowdsec_cti["ip_range_score"]} + "country", get_country_name_from_alpha_2(crowdsec_cti["location"]["country"]) ) - crowdsec_context_object.add_attribute( - "Country", - **{ - "type": "text", - "value": get_country_name_from_alpha_2(crowdsec_cti["location"]["country"]), - }, - ) - if crowdsec_cti["location"]["city"]: + crowdsec_context_object.add_attribute("country-code", crowdsec_cti["location"]["country"]) + if crowdsec_cti["location"].get("city"): crowdsec_context_object.add_attribute( - "City", **{"type": "text", "value": crowdsec_cti["location"]["city"]} + "city", crowdsec_cti["location"]["city"] + ) + crowdsec_context_object.add_attribute("latitude", crowdsec_cti["location"]["latitude"]) + crowdsec_context_object.add_attribute("longitude", crowdsec_cti["location"]["longitude"]) + crowdsec_context_object.add_attribute("as-name", crowdsec_cti["as_name"]) + crowdsec_context_object.add_attribute("as-num", crowdsec_cti["as_num"]) + crowdsec_context_object.add_attribute("reverse-dns", crowdsec_cti["reverse_dns"]) + for behavior in crowdsec_cti["behaviors"]: + crowdsec_context_object.add_attribute( + "behaviors", behavior["label"], + comment=behavior['description'] ) - crowdsec_context_object.add_attribute( - "Latitude", **{"type": "float", "value": crowdsec_cti["location"]["latitude"]} + "attack-details", + ", ".join( + f"{scenario['name']} - {scenario['label']} ({scenario['description']})" + for scenario in crowdsec_cti["attack_details"] + ) ) crowdsec_context_object.add_attribute( - "Longitude", **{"type": "float", "value": crowdsec_cti["location"]["longitude"]} - ) - - crowdsec_context_object.add_attribute( - "AS Name", **{"type": "text", "value": crowdsec_cti["as_name"]} - ) - - crowdsec_context_object.add_attribute( - "AS Number", **{"type": "AS", "value": crowdsec_cti["as_num"]} - ) - - crowdsec_context_object.add_attribute( - "Reverse DNS", **{"type": "domain", "value": crowdsec_cti["reverse_dns"]} - ) - - crowdsec_context_object.add_attribute( - "Attack Categories", - **{ - "type": "text", - "value": ",".join( - [attack_category["label"] for attack_category in crowdsec_cti["behaviors"]] - ), - }, - ) - - crowdsec_context_object.add_attribute( - "Triggered Scenarios", - **{ - "type": "text", - "value": ",".join([scenario["name"] for scenario in crowdsec_cti["attack_details"]]), - }, - ) - - crowdsec_context_object.add_attribute( - "Top 10 Target Countries", - **{ - "type": "float", - "value": ",".join( - map(get_country_name_from_alpha_2, crowdsec_cti["target_countries"].keys()) - ), - }, - ) - - crowdsec_context_object.add_attribute( - "Trust", **{"type": "float", "value": crowdsec_cti["scores"]["overall"]["trust"]} - ) - - crowdsec_context_object.add_attribute( - "First Seen", **{"type": "datetime", "value": crowdsec_cti["history"]["first_seen"]} - ) - - crowdsec_context_object.add_attribute( - "Last Seen", **{"type": "datetime", "value": crowdsec_cti["history"]["last_seen"]} - ) - - for time_period, indicators in crowdsec_cti["scores"].items(): - tp = " ".join(map(str.capitalize, time_period.split("_"))) - - for indicator_type, indicator_value in indicators.items(): - crowdsec_context_object.add_attribute( - f"{tp} {indicator_type.capitalize()}", **{"type": "float", "value": indicator_value} + "target-countries", + ", ".join( + map( + get_country_name_from_alpha_2, + crowdsec_cti["target_countries"].keys() ) - + ) + ) + crowdsec_context_object.add_attribute("trust", crowdsec_cti["scores"]["overall"]["trust"]) + scores = [] + for time_period, indicators in crowdsec_cti["scores"].items(): + tp = ' '.join(map(str.capitalize, time_period.split('_'))) + indicator = ( + f'{indicator_type.capitalize()}: {indicator_value}' + for indicator_type, indicator_value in indicators.items() + ) + scores.append(f"{tp}: {' - '.join(indicator)}") + crowdsec_context_object.add_attribute('scores', ', '.join(scores)) + crowdsec_context_object.add_reference(misp_attribute.uuid, 'related-to') misp_event.add_object(crowdsec_context_object) event = json.loads(misp_event.to_json())