From 0b167df5b06eaa0dd8173127db2e69feacfc63db Mon Sep 17 00:00:00 2001 From: Milo Volpicelli Date: Fri, 20 Oct 2023 13:22:26 +0000 Subject: [PATCH] actual expand implementation --- .../modules/expansion/cluster25_expand.py | 76 +++++++------------ 1 file changed, 26 insertions(+), 50 deletions(-) diff --git a/misp_modules/modules/expansion/cluster25_expand.py b/misp_modules/modules/expansion/cluster25_expand.py index 6828e05..ace8d8b 100755 --- a/misp_modules/modules/expansion/cluster25_expand.py +++ b/misp_modules/modules/expansion/cluster25_expand.py @@ -1,7 +1,8 @@ import json import requests +import time from typing import List -from . import check_input_attribute, standard_error_message +from . import standard_error_message from pymisp import MISPAttribute, MISPEvent moduleinfo = {'version': '0.1', @@ -47,47 +48,29 @@ def handler(q=False): misperrors['error'] = 'Cluster25 base_url is missing' return misperrors - # validate attribute - if not request.get('attribute') or not check_input_attribute(request['attribute']): - return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'} - attribute = request.get('attribute') - if not any(input_type == attribute.get('type') for input_type in misp_type_in): - return {'error': 'Unsupported attribute type.'} + # validate params + if not request.get('params') or not request.get('params', {}).get('value'): + return {'error': f'{standard_error_message}, which should contain a value.'} client = Cluster25CTI(request['config']['api_id'], request['config']['apikey'], request['config']['base_url']) - attribute = MISPAttribute() - attribute.from_dict(**request.get('attribute')) - r = {"results": []} - valid_type = False - - try: - for k in misp_type_in: - if attribute.type == k: - # map the MISP type to the Cluster25 type - r['results'].append(lookup_indicator(client, attribute)) - valid_type = True - except Exception as e: - return {'error': f"{e}"} - - if not valid_type: - misperrors['error'] = "Unsupported attributes type" - return misperrors - return {'results': r.get('results').pop()} + return {'results': lookup_indicator(client, request.get('params'))} -def lookup_indicator(client, ref_attribute): - result = client.search_indicators(ref_attribute.value) +def lookup_indicator(client, indicator): + + result = client.investigate(indicator) + misp_event = MISPEvent() - misp_event.add_attribute(**ref_attribute) + if result.get('error'): + return result - for item in result: - if mapping_out.get(item.get('type')): - r = mapping_out[item.get('type')].copy() - r['value'] = item - attribute = MISPAttribute() - attribute.from_dict(**r) - misp_event.add_attribute(**attribute) + if mapping_out.get(result.get('indicator_type')): + r = mapping_out[result.get('indicator_type')].copy() + r['value'] = result + attribute = MISPAttribute() + attribute.from_dict(**r) + misp_event.add_attribute(**attribute) event = json.loads(misp_event.to_json()) return {'Object': event.get('Object', []), 'Attribute': event.get('Attribute', [])} @@ -108,27 +91,20 @@ class Cluster25CTI: self.client_secret = customer_key self.base_url = base_url self.current_token = self._get_cluster25_token() + self.headers = {"Authorization": f"Bearer {self.current_token}"} - def _get_cluster25_token(self) -> str: + def _get_cluster25_token(self): payload = {"client_id": self.client_id, "client_secret": self.client_secret} r = requests.post(url=f"{self.base_url}/token", json=payload, headers={"Content-Type": "application/json"}) - if r.status_code != 200: - raise Exception( - f"Unable to retrieve the token from C25 platform, status {r.status_code}" - ) + return {'error': f"Unable to retrieve the token from C25 platform, status {r.status_code}"} return r.json()["data"]["token"] - def search_indicators(self, indicator_type) -> List[dict]: - headers = {"Authorization": f"Bearer {self.current_token}"} - params = {'type': indicator_type, 'include_info': True} - r = requests.get(url=f"{self.base_url}/indicators", params=params, headers=headers) - + def investigate(self, indicator) -> dict: + params = {'indicator': indicator.get('value')} + r = requests.get(url=f"{self.base_url}/investigate", params=params, headers=self.headers) if r.status_code != 200: - raise Exception( - f"Unable to retrieve the indicators from C25 platform, status {r.status_code}" - ) + return{'error': f"Unable to retrieve investigate result for indicator '{indicator.get('value')}' " + f"from C25 platform, status {r.status_code}"} return r.json()["data"] - -