mirror of https://github.com/CIRCL/AIL-framework
chg: [exporter] add TheHive exporter
parent
24ffcae690
commit
11cd571f44
|
@ -0,0 +1,259 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*-coding:UTF-8 -*
|
||||||
|
"""
|
||||||
|
Importer Class
|
||||||
|
================
|
||||||
|
|
||||||
|
Import Content
|
||||||
|
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from abc import ABC
|
||||||
|
|
||||||
|
from thehive4py.api import TheHiveApi
|
||||||
|
from thehive4py.models import Alert, AlertArtifact, Case, CaseObservable
|
||||||
|
import thehive4py.exceptions
|
||||||
|
from urllib3 import disable_warnings as urllib3_disable_warnings
|
||||||
|
|
||||||
|
sys.path.append('../../configs/keys')
|
||||||
|
sys.path.append(os.environ['AIL_BIN'])
|
||||||
|
#################################
|
||||||
|
# Import Project packages
|
||||||
|
#################################
|
||||||
|
from exporter.abstract_exporter import AbstractExporter
|
||||||
|
from lib.ConfigLoader import ConfigLoader
|
||||||
|
from lib.ail_core import get_ail_uuid
|
||||||
|
from lib.objects.Items import Item
|
||||||
|
# from lib.objects.abstract_object import AbstractObject
|
||||||
|
|
||||||
|
config_loader = ConfigLoader()
|
||||||
|
r_db = config_loader.get_db_conn("Kvrocks_DB")
|
||||||
|
config_loader = None
|
||||||
|
|
||||||
|
#### FUNCTIONS ####
|
||||||
|
|
||||||
|
# --- FUNCTIONS --- #
|
||||||
|
|
||||||
|
# MISPExporter -> return correct exporter by type ????
|
||||||
|
class TheHiveExporter(AbstractExporter, ABC):
|
||||||
|
"""TheHive Exporter
|
||||||
|
|
||||||
|
:param url: URL of the Hive instance you want to connect to
|
||||||
|
:param key: API key of the user you want to use
|
||||||
|
:param ssl: can be True or False (to check or to not check the validity of the certificate.
|
||||||
|
Or a CA_BUNDLE in case of self signed or other certificate (the concatenation of all the crt of the chain)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, url='', key='', ssl=False):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
if url and key:
|
||||||
|
self.url = url
|
||||||
|
self.key = key
|
||||||
|
self.ssl = ssl
|
||||||
|
if self.ssl is False:
|
||||||
|
urllib3_disable_warnings()
|
||||||
|
elif url or key:
|
||||||
|
raise Exception('Error: missing url or api key')
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
from theHiveKEYS import the_hive_url, the_hive_key, the_hive_verifycert
|
||||||
|
self.url = the_hive_url
|
||||||
|
self.key = the_hive_key
|
||||||
|
self.ssl = the_hive_verifycert
|
||||||
|
if self.ssl is False:
|
||||||
|
urllib3_disable_warnings()
|
||||||
|
if self.url.endswith('/'):
|
||||||
|
self.url = self.url[:-1]
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
self.url = None
|
||||||
|
self.key = None
|
||||||
|
self.ssl = None
|
||||||
|
|
||||||
|
def get_hive(self):
|
||||||
|
return TheHiveApi(self.url, self.key, cert=self.ssl)
|
||||||
|
|
||||||
|
# TODO ADD TIMEOUT
|
||||||
|
# TODO return error
|
||||||
|
def ping(self):
|
||||||
|
try:
|
||||||
|
self.get_hive().get_alert('0')
|
||||||
|
return True
|
||||||
|
except thehive4py.exceptions.AlertException:
|
||||||
|
return False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def sanitize_threat_level(threat_level):
|
||||||
|
try:
|
||||||
|
int(threat_level)
|
||||||
|
if 1 <= threat_level <= 3:
|
||||||
|
return threat_level
|
||||||
|
else:
|
||||||
|
return 2
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return 2
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def sanitize_tlp(tlp):
|
||||||
|
try:
|
||||||
|
int(tlp)
|
||||||
|
if 0 <= tlp <= 3:
|
||||||
|
return tlp
|
||||||
|
else:
|
||||||
|
return 2
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return 2
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def sanitize_analysis(analysis):
|
||||||
|
try:
|
||||||
|
int(analysis)
|
||||||
|
if 0 <= analysis <= 2:
|
||||||
|
return analysis
|
||||||
|
else:
|
||||||
|
return 0
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def get_case_url(self, case_id):
|
||||||
|
return f'{self.url}/cases/{case_id}/details'
|
||||||
|
|
||||||
|
def create_alert(self, artifacts, source, description='AIL', source_ref=None, tags=None, tlp=3, type='ail'):
|
||||||
|
if not source_ref:
|
||||||
|
source_ref = str(uuid.uuid4())[0:6]
|
||||||
|
if tags is None:
|
||||||
|
tags = []
|
||||||
|
|
||||||
|
alert = Alert(title='AIL Leak',
|
||||||
|
tlp=self.sanitize_tlp(tlp),
|
||||||
|
tags=tags,
|
||||||
|
description=description,
|
||||||
|
type=type,
|
||||||
|
source=source,
|
||||||
|
sourceRef=source_ref,
|
||||||
|
artifacts=artifacts)
|
||||||
|
|
||||||
|
# Create the Alert
|
||||||
|
alert_id = 0
|
||||||
|
try:
|
||||||
|
req = self.get_hive().create_alert(alert)
|
||||||
|
if req.status_code == 201:
|
||||||
|
# print(json.dumps(req.json(), indent=4, sort_keys=True))
|
||||||
|
print('Alert Created')
|
||||||
|
print(req.json())
|
||||||
|
alert_id = req.json()['id']
|
||||||
|
else:
|
||||||
|
# TODO LOGS
|
||||||
|
print(f'ko: {req.status_code}/{req.text}')
|
||||||
|
return 0
|
||||||
|
except:
|
||||||
|
print('hive connection error')
|
||||||
|
return alert_id
|
||||||
|
|
||||||
|
def create_case(self, observables, description=None, tags=None, title=None, threat_level=2, tlp=2):
|
||||||
|
if tags is None:
|
||||||
|
tags = []
|
||||||
|
|
||||||
|
case = Case(title=title,
|
||||||
|
tlp=self.sanitize_tlp(tlp),
|
||||||
|
severity=self.sanitize_threat_level(threat_level),
|
||||||
|
flag=False,
|
||||||
|
tags=tags,
|
||||||
|
description=description)
|
||||||
|
|
||||||
|
# Create Case
|
||||||
|
thehive = self.get_hive()
|
||||||
|
resp = thehive.create_case(case)
|
||||||
|
if resp.status_code == 201:
|
||||||
|
case_id = resp.json()['id']
|
||||||
|
|
||||||
|
for observable in observables:
|
||||||
|
resp_o = thehive.create_case_observable(case_id, observable)
|
||||||
|
if resp_o.status_code != 201:
|
||||||
|
# TODO LOGS
|
||||||
|
print(f'error observable creation: {resp_o.status_code}/{resp_o.text}')
|
||||||
|
# print(case_id)
|
||||||
|
# return HIVE_URL /thehive/cases/~37040/details
|
||||||
|
return case_id
|
||||||
|
else:
|
||||||
|
# TODO LOGS
|
||||||
|
print(f'ko: {resp.status_code}/{resp.text}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'<{self.__class__.__name__}(url={self.url})'
|
||||||
|
|
||||||
|
|
||||||
|
class TheHiveExporterTagTrigger(TheHiveExporter):
|
||||||
|
"""TheHiveExporter TagTrigger
|
||||||
|
|
||||||
|
:param url: URL of the Hive instance you want to connect to
|
||||||
|
:param key: API key of the user you want to use
|
||||||
|
:param ssl: can be True or False (to check or to not check the validity of the certificate. Or a CA_BUNDLE in case of self signed or other certificate (the concatenation of all the crt of the chain)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, url='', key='', ssl=False):
|
||||||
|
super().__init__(url=url, key=key, ssl=ssl)
|
||||||
|
|
||||||
|
def export(self, item_id, tag):
|
||||||
|
item = Item(item_id)
|
||||||
|
tags = item.get_tags()
|
||||||
|
|
||||||
|
# remove .gz from submitted path to TheHive because content is decompressed
|
||||||
|
if item_id.endswith(".gz"):
|
||||||
|
item_id = item_id[:-3]
|
||||||
|
# add .txt to make it easier to open when downloaded from TheHive
|
||||||
|
item_id = f'{item_id}.txt'
|
||||||
|
|
||||||
|
description = f'AIL Leak, triggered by {tag}'
|
||||||
|
|
||||||
|
artifacts = [
|
||||||
|
AlertArtifact(dataType='other', message='uuid-ail', data=(get_ail_uuid())),
|
||||||
|
AlertArtifact(dataType='file', data=(item.get_raw_content(decompress=True), item_id), tags=tags)
|
||||||
|
]
|
||||||
|
|
||||||
|
return self.create_alert(artifacts, description=description, source=item.get_source(), tags=tags)
|
||||||
|
|
||||||
|
|
||||||
|
class TheHiveExporterItem(TheHiveExporter):
|
||||||
|
"""TheHiveExporter Item case
|
||||||
|
|
||||||
|
:param url: URL of the Hive instance you want to connect to
|
||||||
|
:param key: API key of the user you want to use
|
||||||
|
:param ssl: can be True or False (to check or to not check the validity of the certificate. Or a CA_BUNDLE in case of self signed or other certificate (the concatenation of all the crt of the chain)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, url='', key='', ssl=False):
|
||||||
|
super().__init__(url=url, key=key, ssl=ssl)
|
||||||
|
|
||||||
|
def export(self, item_id, description=None, title=None, threat_level=None, tlp=None):
|
||||||
|
item = Item(item_id)
|
||||||
|
date = item.get_date()
|
||||||
|
date = f'{date[0:4]}-{date[4:6]}-{date[6:8]}'
|
||||||
|
tags = item.get_tags()
|
||||||
|
ail_uuid = get_ail_uuid()
|
||||||
|
|
||||||
|
if not title:
|
||||||
|
title = f'AIL Case {item.id}'
|
||||||
|
if not description:
|
||||||
|
description = f'AIL {ail_uuid} Case'
|
||||||
|
|
||||||
|
observables = [
|
||||||
|
CaseObservable(dataType="other", data=[ail_uuid], message="uuid-ail"),
|
||||||
|
CaseObservable(dataType="file", data=item.get_filename(), tags=tags),
|
||||||
|
CaseObservable(dataType="other", data=[item.get_source()], message="source"),
|
||||||
|
CaseObservable(dataType="other", data=[date], message="last-seen")
|
||||||
|
]
|
||||||
|
|
||||||
|
case_id = self.create_case(observables, tags=tags, description=description, title=title,
|
||||||
|
threat_level=threat_level, tlp=tlp)
|
||||||
|
|
||||||
|
# SAVE CASE URL/ID ????
|
||||||
|
|
||||||
|
return case_id
|
||||||
|
|
||||||
|
|
||||||
|
# if __name__ == '__main__':
|
Loading…
Reference in New Issue