actual expand implementation

pull/642/head
Milo Volpicelli 2023-10-20 13:22:26 +00:00
parent a4893d997d
commit 0b167df5b0
1 changed files with 26 additions and 50 deletions

View File

@ -1,7 +1,8 @@
import json import json
import requests import requests
import time
from typing import List from typing import List
from . import check_input_attribute, standard_error_message from . import standard_error_message
from pymisp import MISPAttribute, MISPEvent from pymisp import MISPAttribute, MISPEvent
moduleinfo = {'version': '0.1', moduleinfo = {'version': '0.1',
@ -47,47 +48,29 @@ def handler(q=False):
misperrors['error'] = 'Cluster25 base_url is missing' misperrors['error'] = 'Cluster25 base_url is missing'
return misperrors return misperrors
# validate attribute # validate params
if not request.get('attribute') or not check_input_attribute(request['attribute']): if not request.get('params') or not request.get('params', {}).get('value'):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'} return {'error': f'{standard_error_message}, which should contain a value.'}
attribute = request.get('attribute')
if not any(input_type == attribute.get('type') for input_type in misp_type_in):
return {'error': 'Unsupported attribute type.'}
client = Cluster25CTI(request['config']['api_id'], request['config']['apikey'], request['config']['base_url']) client = Cluster25CTI(request['config']['api_id'], request['config']['apikey'], request['config']['base_url'])
attribute = MISPAttribute() return {'results': lookup_indicator(client, request.get('params'))}
attribute.from_dict(**request.get('attribute'))
r = {"results": []}
valid_type = False
try:
for k in misp_type_in:
if attribute.type == k:
# map the MISP type to the Cluster25 type
r['results'].append(lookup_indicator(client, attribute))
valid_type = True
except Exception as e:
return {'error': f"{e}"}
if not valid_type:
misperrors['error'] = "Unsupported attributes type"
return misperrors
return {'results': r.get('results').pop()}
def lookup_indicator(client, ref_attribute): def lookup_indicator(client, indicator):
result = client.search_indicators(ref_attribute.value)
result = client.investigate(indicator)
misp_event = MISPEvent() misp_event = MISPEvent()
misp_event.add_attribute(**ref_attribute) if result.get('error'):
return result
for item in result: if mapping_out.get(result.get('indicator_type')):
if mapping_out.get(item.get('type')): r = mapping_out[result.get('indicator_type')].copy()
r = mapping_out[item.get('type')].copy() r['value'] = result
r['value'] = item attribute = MISPAttribute()
attribute = MISPAttribute() attribute.from_dict(**r)
attribute.from_dict(**r) misp_event.add_attribute(**attribute)
misp_event.add_attribute(**attribute)
event = json.loads(misp_event.to_json()) event = json.loads(misp_event.to_json())
return {'Object': event.get('Object', []), 'Attribute': event.get('Attribute', [])} return {'Object': event.get('Object', []), 'Attribute': event.get('Attribute', [])}
@ -108,27 +91,20 @@ class Cluster25CTI:
self.client_secret = customer_key self.client_secret = customer_key
self.base_url = base_url self.base_url = base_url
self.current_token = self._get_cluster25_token() self.current_token = self._get_cluster25_token()
self.headers = {"Authorization": f"Bearer {self.current_token}"}
def _get_cluster25_token(self) -> str: def _get_cluster25_token(self):
payload = {"client_id": self.client_id, "client_secret": self.client_secret} payload = {"client_id": self.client_id, "client_secret": self.client_secret}
r = requests.post(url=f"{self.base_url}/token", json=payload, headers={"Content-Type": "application/json"}) r = requests.post(url=f"{self.base_url}/token", json=payload, headers={"Content-Type": "application/json"})
if r.status_code != 200: if r.status_code != 200:
raise Exception( return {'error': f"Unable to retrieve the token from C25 platform, status {r.status_code}"}
f"Unable to retrieve the token from C25 platform, status {r.status_code}"
)
return r.json()["data"]["token"] return r.json()["data"]["token"]
def search_indicators(self, indicator_type) -> List[dict]: def investigate(self, indicator) -> dict:
headers = {"Authorization": f"Bearer {self.current_token}"} params = {'indicator': indicator.get('value')}
params = {'type': indicator_type, 'include_info': True} r = requests.get(url=f"{self.base_url}/investigate", params=params, headers=self.headers)
r = requests.get(url=f"{self.base_url}/indicators", params=params, headers=headers)
if r.status_code != 200: if r.status_code != 200:
raise Exception( return{'error': f"Unable to retrieve investigate result for indicator '{indicator.get('value')}' "
f"Unable to retrieve the indicators from C25 platform, status {r.status_code}" f"from C25 platform, status {r.status_code}"}
)
return r.json()["data"] return r.json()["data"]