From 11cd571f44c2f5fc59b5fa20d9ec28e9724a21db Mon Sep 17 00:00:00 2001 From: Terrtia Date: Wed, 15 Feb 2023 15:18:10 +0100 Subject: [PATCH] chg: [exporter] add TheHive exporter --- bin/exporter/TheHiveExporter.py | 259 ++++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) create mode 100755 bin/exporter/TheHiveExporter.py diff --git a/bin/exporter/TheHiveExporter.py b/bin/exporter/TheHiveExporter.py new file mode 100755 index 00000000..e943861c --- /dev/null +++ b/bin/exporter/TheHiveExporter.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +Importer Class +================ + +Import Content + +""" +import os +import sys +import uuid + +from abc import ABC + +from thehive4py.api import TheHiveApi +from thehive4py.models import Alert, AlertArtifact, Case, CaseObservable +import thehive4py.exceptions +from urllib3 import disable_warnings as urllib3_disable_warnings + +sys.path.append('../../configs/keys') +sys.path.append(os.environ['AIL_BIN']) +################################# +# Import Project packages +################################# +from exporter.abstract_exporter import AbstractExporter +from lib.ConfigLoader import ConfigLoader +from lib.ail_core import get_ail_uuid +from lib.objects.Items import Item +# from lib.objects.abstract_object import AbstractObject + +config_loader = ConfigLoader() +r_db = config_loader.get_db_conn("Kvrocks_DB") +config_loader = None + +#### FUNCTIONS #### + +# --- FUNCTIONS --- # + +# MISPExporter -> return correct exporter by type ???? +class TheHiveExporter(AbstractExporter, ABC): + """TheHive Exporter + + :param url: URL of the Hive instance you want to connect to + :param key: API key of the user you want to use + :param ssl: can be True or False (to check or to not check the validity of the certificate. + Or a CA_BUNDLE in case of self signed or other certificate (the concatenation of all the crt of the chain) + """ + + def __init__(self, url='', key='', ssl=False): + super().__init__() + + if url and key: + self.url = url + self.key = key + self.ssl = ssl + if self.ssl is False: + urllib3_disable_warnings() + elif url or key: + raise Exception('Error: missing url or api key') + else: + try: + from theHiveKEYS import the_hive_url, the_hive_key, the_hive_verifycert + self.url = the_hive_url + self.key = the_hive_key + self.ssl = the_hive_verifycert + if self.ssl is False: + urllib3_disable_warnings() + if self.url.endswith('/'): + self.url = self.url[:-1] + except ModuleNotFoundError: + self.url = None + self.key = None + self.ssl = None + + def get_hive(self): + return TheHiveApi(self.url, self.key, cert=self.ssl) + + # TODO ADD TIMEOUT + # TODO return error + def ping(self): + try: + self.get_hive().get_alert('0') + return True + except thehive4py.exceptions.AlertException: + return False + + @staticmethod + def sanitize_threat_level(threat_level): + try: + int(threat_level) + if 1 <= threat_level <= 3: + return threat_level + else: + return 2 + except (TypeError, ValueError): + return 2 + + @staticmethod + def sanitize_tlp(tlp): + try: + int(tlp) + if 0 <= tlp <= 3: + return tlp + else: + return 2 + except (TypeError, ValueError): + return 2 + + @staticmethod + def sanitize_analysis(analysis): + try: + int(analysis) + if 0 <= analysis <= 2: + return analysis + else: + return 0 + except (TypeError, ValueError): + return 0 + + def get_case_url(self, case_id): + return f'{self.url}/cases/{case_id}/details' + + def create_alert(self, artifacts, source, description='AIL', source_ref=None, tags=None, tlp=3, type='ail'): + if not source_ref: + source_ref = str(uuid.uuid4())[0:6] + if tags is None: + tags = [] + + alert = Alert(title='AIL Leak', + tlp=self.sanitize_tlp(tlp), + tags=tags, + description=description, + type=type, + source=source, + sourceRef=source_ref, + artifacts=artifacts) + + # Create the Alert + alert_id = 0 + try: + req = self.get_hive().create_alert(alert) + if req.status_code == 201: + # print(json.dumps(req.json(), indent=4, sort_keys=True)) + print('Alert Created') + print(req.json()) + alert_id = req.json()['id'] + else: + # TODO LOGS + print(f'ko: {req.status_code}/{req.text}') + return 0 + except: + print('hive connection error') + return alert_id + + def create_case(self, observables, description=None, tags=None, title=None, threat_level=2, tlp=2): + if tags is None: + tags = [] + + case = Case(title=title, + tlp=self.sanitize_tlp(tlp), + severity=self.sanitize_threat_level(threat_level), + flag=False, + tags=tags, + description=description) + + # Create Case + thehive = self.get_hive() + resp = thehive.create_case(case) + if resp.status_code == 201: + case_id = resp.json()['id'] + + for observable in observables: + resp_o = thehive.create_case_observable(case_id, observable) + if resp_o.status_code != 201: + # TODO LOGS + print(f'error observable creation: {resp_o.status_code}/{resp_o.text}') + # print(case_id) + # return HIVE_URL /thehive/cases/~37040/details + return case_id + else: + # TODO LOGS + print(f'ko: {resp.status_code}/{resp.text}') + return None + + def __repr__(self): + return f'<{self.__class__.__name__}(url={self.url})' + + +class TheHiveExporterTagTrigger(TheHiveExporter): + """TheHiveExporter TagTrigger + + :param url: URL of the Hive instance you want to connect to + :param key: API key of the user you want to use + :param ssl: can be True or False (to check or to not check the validity of the certificate. Or a CA_BUNDLE in case of self signed or other certificate (the concatenation of all the crt of the chain) + """ + + def __init__(self, url='', key='', ssl=False): + super().__init__(url=url, key=key, ssl=ssl) + + def export(self, item_id, tag): + item = Item(item_id) + tags = item.get_tags() + + # remove .gz from submitted path to TheHive because content is decompressed + if item_id.endswith(".gz"): + item_id = item_id[:-3] + # add .txt to make it easier to open when downloaded from TheHive + item_id = f'{item_id}.txt' + + description = f'AIL Leak, triggered by {tag}' + + artifacts = [ + AlertArtifact(dataType='other', message='uuid-ail', data=(get_ail_uuid())), + AlertArtifact(dataType='file', data=(item.get_raw_content(decompress=True), item_id), tags=tags) + ] + + return self.create_alert(artifacts, description=description, source=item.get_source(), tags=tags) + + +class TheHiveExporterItem(TheHiveExporter): + """TheHiveExporter Item case + + :param url: URL of the Hive instance you want to connect to + :param key: API key of the user you want to use + :param ssl: can be True or False (to check or to not check the validity of the certificate. Or a CA_BUNDLE in case of self signed or other certificate (the concatenation of all the crt of the chain) + """ + + def __init__(self, url='', key='', ssl=False): + super().__init__(url=url, key=key, ssl=ssl) + + def export(self, item_id, description=None, title=None, threat_level=None, tlp=None): + item = Item(item_id) + date = item.get_date() + date = f'{date[0:4]}-{date[4:6]}-{date[6:8]}' + tags = item.get_tags() + ail_uuid = get_ail_uuid() + + if not title: + title = f'AIL Case {item.id}' + if not description: + description = f'AIL {ail_uuid} Case' + + observables = [ + CaseObservable(dataType="other", data=[ail_uuid], message="uuid-ail"), + CaseObservable(dataType="file", data=item.get_filename(), tags=tags), + CaseObservable(dataType="other", data=[item.get_source()], message="source"), + CaseObservable(dataType="other", data=[date], message="last-seen") + ] + + case_id = self.create_case(observables, tags=tags, description=description, title=title, + threat_level=threat_level, tlp=tlp) + + # SAVE CASE URL/ID ???? + + return case_id + + +# if __name__ == '__main__':