Merge pull request #422 from trustar/feat/EN-5047/MISP-manual-update

Feat/en 5047/misp manual update
pull/425/head
Alexandre Dulaunoy 2020-08-11 06:59:33 +02:00 committed by GitHub
commit 7021a0d657
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 113 additions and 30 deletions

View File

@ -1,8 +1,11 @@
import json
import pymisp
from base64 import b64encode
from collections import OrderedDict
from . import check_input_attribute, checking_error, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
from trustar import TruStar
from trustar import TruStar, Indicator
from urllib.parse import quote
misperrors = {'error': "Error"}
mispattributes = {
@ -34,9 +37,13 @@ class TruSTARParser:
'SHA256': "sha256"
}
# Relevant fields from each TruSTAR endpoint
SUMMARY_FIELDS = ["severityLevel", "source", "score", "attributes"]
METADATA_FIELDS = ["sightings", "first_seen", "last_seen", "tags"]
REPORT_BASE_URL = "https://station.trustar.co/constellation/reports/{}"
CLIENT_METATAG = "MISP-{}".format(pymisp.__version__)
CLIENT_METATAG = f"MISP-{pymisp.__version__}"
def __init__(self, attribute, config):
config['enclave_ids'] = config.get('enclave_ids', "").strip().split(',')
@ -52,45 +59,111 @@ class TruSTARParser:
"""
Returns the MISP Event enriched with TruSTAR indicator summary data.
"""
try:
event = json.loads(self.misp_event.to_json())
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
return {'results': results}
except Exception as e:
misperrors['error'] += f" -- Encountered issue serializing enrichment data -- {e}"
return misperrors
def generate_trustar_links(self, entity_value):
def generate_trustar_link(self, entity_type, entity_value):
"""
Generates links to TruSTAR reports if they exist.
Generates link to TruSTAR report of entity.
:param entity_type: <str> Type of entity.
:param entity_value: <str> Value of entity.
:return: <str> Link to indicator report in TruSTAR platform.
"""
report_links = list()
trustar_reports = self.ts_client.search_reports(entity_value)
for report in trustar_reports:
report_links.append(self.REPORT_BASE_URL.format(report.id))
report_id = b64encode(quote(f"{entity_type}|{entity_value}").encode()).decode()
return report_links
return self.REPORT_BASE_URL.format(report_id)
def parse_indicator_summary(self, summaries):
@staticmethod
def extract_tags(enrichment_report):
"""
Converts a response from the TruSTAR /1.3/indicators/summaries endpoint
a MISP trustar_report object and adds the summary data and links as attributes.
Extracts tags from the enrichment report in order to add them
to the TruSTAR MISP Object. Removes tags from report to avoid
redundancy.
:param summaries: <generator> A TruSTAR Python SDK Page.generator object for generating
indicator summaries pages.
:param: <OrderedDict> Enrichment data.
:return: <list> List of tags.
"""
if enrichment_report and enrichment_report.get('tags'):
return [tag.get('name') for tag in enrichment_report.pop('tags')]
return None
def generate_enrichment_report(self, summary, metadata):
"""
Extracts desired fields from summary and metadata reports and
generates an enrichment report.
:param summary: <trustar.IndicatorSummary> Indicator summary report.
:param metadata: <trustar.Indicator> Indicator metadata report.
:return: <str> Enrichment report.
"""
# Preserve order of fields as they exist in SUMMARY_FIELDS and METADATA_FIELDS
enrichment_report = OrderedDict()
if summary:
summary_dict = summary.to_dict()
enrichment_report.update(
{field: summary_dict[field] for field in self.SUMMARY_FIELDS if summary_dict.get(field)})
if metadata:
metadata_dict = metadata.to_dict()
enrichment_report.update(
{field: metadata_dict[field] for field in self.METADATA_FIELDS if metadata_dict.get(field)})
return enrichment_report
def parse_indicator_summary(self, indicator, summary, metadata):
"""
Pulls enrichment data from the TruSTAR /indicators/summaries and /indicators/metadata endpoints
and creates a MISP trustar_report.
:param indicator: <str> Value of the attribute
:summary: <trustar.IndicatorSummary> Indicator summary response object.
:metadata: <trustar.Indicator> Indicator response object.
"""
for summary in summaries:
trustar_obj = MISPObject('trustar_report')
# Verify that the indicator type is supported by TruSTAR
if summary and summary.indicator_type in self.ENTITY_TYPE_MAPPINGS:
indicator_type = summary.indicator_type
indicator_value = summary.value
if indicator_type in self.ENTITY_TYPE_MAPPINGS:
elif metadata and metadata.type in self.ENTITY_TYPE_MAPPINGS:
indicator_type = metadata.type
else:
misperrors['error'] += " -- Attribute not found or not supported"
raise Exception
try:
# Extract most relevant fields from indicator summary and metadata responses
enrichment_report = self.generate_enrichment_report(summary, metadata)
tags = self.extract_tags(enrichment_report)
if enrichment_report:
# Create MISP trustar_report object and populate it with enrichment data
trustar_obj = MISPObject('trustar_report')
trustar_obj.add_attribute(indicator_type, attribute_type=self.ENTITY_TYPE_MAPPINGS[indicator_type],
value=indicator_value)
value=indicator)
trustar_obj.add_attribute("INDICATOR_SUMMARY", attribute_type="text",
value=json.dumps(summary.to_dict(), sort_keys=True, indent=4))
report_links = self.generate_trustar_links(indicator_value)
for link in report_links:
trustar_obj.add_attribute("REPORT_LINK", attribute_type="link", value=link)
value=json.dumps(enrichment_report, indent=4))
report_link = self.generate_trustar_link(indicator_type, indicator)
trustar_obj.add_attribute("REPORT_LINK", attribute_type="link", value=report_link)
self.misp_event.add_object(**trustar_obj)
elif not tags:
# If enrichment report is empty and there are no tags, nothing to add to attribute
raise Exception("No relevant data found")
if tags:
for tag in tags:
self.misp_event.add_attribute_tag(tag, indicator)
except Exception as e:
misperrors['error'] += f" -- Error enriching attribute {indicator} -- {e}"
raise e
def handler(q=False):
@ -117,15 +190,25 @@ def handler(q=False):
if attribute['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
trustar_parser = TruSTARParser(attribute, config)
metadata = None
summary = None
try:
summaries = list(
trustar_parser.ts_client.get_indicator_summaries([attribute['value']], page_size=MAX_PAGE_SIZE))
metadata = trustar_parser.ts_client.get_indicators_metadata([Indicator(value=attribute['value'])])[0]
except Exception as e:
misperrors['error'] = "Unable to retrieve TruSTAR summary data: {}".format(e)
misperrors['error'] += f" -- Could not retrieve indicator metadata from TruSTAR {e}"
try:
summary = list(
trustar_parser.ts_client.get_indicator_summaries([attribute['value']], page_size=MAX_PAGE_SIZE))[0]
except Exception as e:
misperrors['error'] += f" -- Unable to retrieve TruSTAR summary data: {e}"
try:
trustar_parser.parse_indicator_summary(attribute['value'], summary, metadata)
except Exception:
return misperrors
trustar_parser.parse_indicator_summary(summaries)
return trustar_parser.get_results()