Cleanup of code and 'quick-n-dirty' sanitizing of tags

pull/508/head
Koen Van Impe 2019-12-27 16:19:51 +01:00
parent acae958947
commit ca2049e9ae
1 changed files with 405 additions and 405 deletions

164
examples/stats_report.py Normal file → Executable file
View File

@ -32,7 +32,6 @@ import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def init(url, key, verifycert):
'''
Template to get MISP module started
@ -40,7 +39,6 @@ def init(url, key, verifycert):
return ExpandedPyMISP(url, key, verifycert, 'json')
def get_data(misp, timeframe, date_from=None, date_to=None):
'''
Get the event date to build our report
@ -181,8 +179,7 @@ def get_data(misp, timeframe, date_from = None, date_to = None):
sys.exit('Unable to get statistics from MISP')
def build_report(report, timeframe, misp_url):
def build_report(report, timeframe, misp_url, sanitize_report=True):
'''
Build the body of the report and optional attachments
'''
@ -216,91 +213,49 @@ def build_report(report, timeframe, misp_url):
report_body = report_body + '\n #%s %s (%s) \t%s \n\t\t\t\t(Date: %s, Updated: %s, Published: %s)' % (el['id'], el['threat_level'], el['analysis_completion'], el['title'].decode('utf-8'), el['date'], el['timestamp'], el['publish_timestamp'])
attachments['misp_events'] = attachments['misp_events'] + '\n%s;%s;%s;%s;%s;%s;%s' % (el['id'], el['title'].decode('utf-8'), el['date'], el['timestamp'], el['publish_timestamp'], el['threat_level'], el['analysis_completion'])
report_body = report_body + '\n\n'
report_body, attachments['attr_category'] = add_report_body(report_body, 'New or updated attributes - Category', report['attr_category'], 'AttributeCategory;Qt')
report_body, attachments['attr_type'] = add_report_body(report_body, 'New or updated attributes - Type', report['attr_type'], 'AttributeType;Qt')
report_body, attachments['tags_tlp'] = add_report_body(report_body, 'TLP Codes', report['tags_tlp'], 'TLP;Qt')
report_body, attachments['tags_misp_galaxy'] = add_report_body(report_body, 'Tag MISP Galaxy', report['tags_misp_galaxy'], 'MISPGalaxy;Qt')
report_body, attachments['tags_misp_galaxy_mitre'] = add_report_body(report_body, 'Tag MISP Galaxy Mitre', report['tags_misp_galaxy_mitre'], 'MISPGalaxyMitre;Qt')
report_body, attachments['tags_misp_galaxy_threat_actor'] = add_report_body(report_body, 'Tag MISP Galaxy Threat Actor', report['tags_misp_galaxy_threat_actor'], 'MISPGalaxyThreatActor;Qt')
report_body, attachments['tags_type'] = add_report_body(report_body, 'Tags', report['tags_type'], 'Tag;Qt')
report_body, attachments['galaxies'] = add_report_body(report_body, 'Galaxies', report['galaxies'], 'Galaxies;Qt')
report_body, attachments['galaxies_cluster'] = add_report_body(report_body, 'Galaxies Cluster', report['galaxies_cluster'], 'Galaxies;Qt')
report_body = report_body + '\nNew or updated attributes - Category \n-------------------------------------------------------------------------------'
attr_category_s = sorted(report['attr_category'].items(), key=lambda kv: (kv[1], kv[0]), reverse=True)
attachments['attr_category'] = 'AttributeCategory;Qt'
for el in attr_category_s:
report_body = report_body + '\n%s \t %s' % (el[0], el[1])
attachments['attr_category'] = attachments['attr_category'] + '\n%s;%s' % (el[0], el[1])
if sanitize_report:
mitre_tactic = get_sanitized_report(report['tags_misp_galaxy_mitre'], 'ATT&CK Tactic')
mitre_group = get_sanitized_report(report['tags_misp_galaxy_mitre'], 'ATT&CK Group')
mitre_software = get_sanitized_report(report['tags_misp_galaxy_mitre'], 'ATT&CK Software')
threat_actor = get_sanitized_report(report['tags_misp_galaxy_threat_actor'], 'MISP Threat Actor')
misp_tag = get_sanitized_report(report['tags_type'], 'MISP Tags', False, True)
report_body = report_body + '\n\n'
report_body = report_body + '\nNew or updated attributes - Type \n-------------------------------------------------------------------------------'
attr_type_s = sorted(report['attr_type'].items(), key=lambda kv: (kv[1], kv[0]), reverse=True)
attachments['attr_type'] = 'AttributeType;Qt'
for el in attr_type_s:
report_body = report_body + '\n%s \t %s' % (el[0], el[1])
attachments['attr_type'] = attachments['attr_type'] + '\n%s;%s' % (el[0], el[1])
report_body = report_body + '\n\n'
report_body = report_body + '\nTLP Codes \n-------------------------------------------------------------------------------'
attachments['tags_tlp'] = 'TLP;Qt'
for el in report['tags_tlp']:
report_body = report_body + "\n%s \t %s" % (el, report['tags_tlp'][el])
attachments['tags_tlp'] = attachments['tags_tlp'] + '\n%s;%s' % (el, report['tags_tlp'][el])
report_body = report_body + '\n\n'
report_body = report_body + '\nTag MISP Galaxy\n-------------------------------------------------------------------------------'
tags_misp_galaxy_s = sorted(report['tags_misp_galaxy'].items(), key=lambda kv: (kv[1], kv[0]), reverse=True)
attachments['tags_misp_galaxy'] = 'MISPGalaxy;Qt'
for el in tags_misp_galaxy_s:
report_body = report_body + "\n%s \t %s" % (el[0], el[1])
attachments['tags_misp_galaxy'] = attachments['tags_misp_galaxy'] + '\n%s;%s' % (el[0], el[1])
report_body = report_body + '\n\n'
report_body = report_body + '\nTag MISP Galaxy Mitre \n-------------------------------------------------------------------------------'
tags_misp_galaxy_mitre_s = sorted(report['tags_misp_galaxy_mitre'].items(), key=lambda kv: (kv[1], kv[0]), reverse=True)
attachments['tags_misp_galaxy_mitre'] = 'MISPGalaxyMitre;Qt'
for el in tags_misp_galaxy_mitre_s:
report_body = report_body + "\n%s \t %s" % (el[0], el[1])
attachments['tags_misp_galaxy_mitre'] = attachments['tags_misp_galaxy_mitre'] + '\n%s;%s' % (el[0], el[1])
report_body = report_body + '\n\n'
report_body = report_body + '\nTag MISP Galaxy Threat Actor \n-------------------------------------------------------------------------------'
tags_misp_galaxy_threat_actor_s = sorted(report['tags_misp_galaxy_threat_actor'].items(), key=lambda kv: (kv[1], kv[0]), reverse=True)
attachments['tags_misp_galaxy_threat_actor'] = 'MISPGalaxyThreatActor;Qt'
for el in tags_misp_galaxy_threat_actor_s:
report_body = report_body + "\n%s \t %s" % (el[0], el[1])
attachments['tags_misp_galaxy_threat_actor'] = attachments['tags_misp_galaxy_threat_actor'] + '\n%s;%s' % (el[0], el[1])
report_body = report_body + '\n\n'
report_body = report_body + '\nTags \n-------------------------------------------------------------------------------'
tags_type_s = sorted(report['tags_type'].items(), key=lambda kv: (kv[1], kv[0]), reverse=True)
attachments['tags_type'] = 'Tag;Qt'
for el in tags_type_s:
report_body = report_body + "\n%s \t %s" % (el[0], el[1])
attachments['tags_type'] = attachments['tags_type'] + '\n%s;%s' % (el[0], el[1])
report_body = report_body + '\n\n'
report_body = report_body + '\nGalaxies \n-------------------------------------------------------------------------------'
galaxies_s = sorted(report['galaxies'].items(), key=lambda kv: (kv[1], kv[0]), reverse=True)
attachments['galaxies'] = 'Galaxies;Qt'
for el in galaxies_s:
report_body = report_body + "\n%s \t %s" % (el[0], el[1])
attachments['galaxies'] = attachments['galaxies'] + '\n%s;%s' % (el[0], el[1])
report_body = report_body + '\n\n'
report_body = report_body + '\nGalaxies Cluster \n-------------------------------------------------------------------------------'
galaxies_cluster_s = sorted(report['galaxies_cluster'].items(), key=lambda kv: (kv[1], kv[0]), reverse=True)
attachments['galaxies_cluster'] = 'Galaxies;Qt'
for el in galaxies_cluster_s:
report_body = report_body + "\n%s \t %s" % (el[0], el[1])
attachments['galaxies_cluster'] = attachments['galaxies_cluster'] + '\n%s;%s' % (el[0], el[1])
report_body, attachments['mitre_tactics'] = add_report_body(report_body, 'MITRE ATT&CK Tactics (sanitized)', mitre_tactic, 'MITRETactics;Qt')
report_body, attachments['mitre_group'] = add_report_body(report_body, 'MITRE ATT&CK Group (sanitized)', mitre_group, 'MITREGroup;Qt')
report_body, attachments['mitre_software'] = add_report_body(report_body, 'MITRE ATT&CK Software (sanitized)', mitre_software, 'MITRESoftware;Qt')
report_body, attachments['threat_actor'] = add_report_body(report_body, 'MISP Threat Actor (sanitized)', threat_actor, 'MISPThreatActor;Qt')
report_body, attachments['misp_tag'] = add_report_body(report_body, 'Tags (sanitized)', misp_tag, 'MISPTags;Qt')
report_body = report_body + "\n\nMISP Reporter Finished\n"
return report_body, attachments
def add_report_body(report_body, subtitle, data_object, csv_title):
'''
Add a section to the report body text
'''
if report_body:
report_body = report_body + '\n\n'
report_body = report_body + '\n%s\n-------------------------------------------------------------------------------' % subtitle
data_object_s = sorted(data_object.items(), key=lambda kv: (kv[1], kv[0]), reverse=True)
csv_attachment = csv_title
for el in data_object_s:
report_body = report_body + "\n%s \t %s" % (el[0], el[1])
csv_attachment = csv_attachment + '\n%s;%s' % (el[0], el[1])
return report_body, csv_attachment
def msg_attach(content, filename):
'''
@ -312,7 +267,6 @@ def msg_attach(content, filename):
return part
def print_report(report_body, attachments, smtp_from, smtp_to, smtp_server, misp_url):
'''
Print (or send) the report
@ -348,6 +302,11 @@ def print_report(report_body, attachments, smtp_from, smtp_to, smtp_server, misp
msg.attach(msg_attach(attachments['tags_type'], 'tags_type.csv'))
msg.attach(msg_attach(attachments['galaxies'], 'galaxies.csv'))
msg.attach(msg_attach(attachments['galaxies_cluster'], 'galaxies_cluster.csv'))
msg.attach(msg_attach(attachments['misp_tag'], 'misp_tag.csv'))
msg.attach(msg_attach(attachments['threat_actor'], 'threat_actor.csv'))
msg.attach(msg_attach(attachments['mitre_software'], 'mitre_software.csv'))
msg.attach(msg_attach(attachments['mitre_group'], 'mitre_group.csv'))
msg.attach(msg_attach(attachments['mitre_tactics'], 'mitre_tactics.csv'))
server = smtplib.SMTP(smtp_server)
server.sendmail(smtp_from, smtp_to, msg.as_string())
@ -356,6 +315,47 @@ def print_report(report_body, attachments, smtp_from, smtp_to, smtp_server, misp
print(report_body)
def get_sanitized_report(dataset, sanitize_selector='ATT&CK Tactic', lower=False, add_not_sanitized=False):
'''
Remove or bundle some of the tags
'quick'n'dirty ; could also do this by using the galaxy/tags definition
'''
# If you add the element completely then it gets removed by an empty string; this allows to filter out non-relevant items
sanitize_set = {
'ATT&CK Tactic': ['misp-galaxy:mitre-enterprise-attack-pattern="', 'misp-galaxy:mitre-pre-attack-pattern="', 'misp-galaxy:mitre-mobile-attack-pattern="', 'misp-galaxy:mitre-attack-pattern="', 'misp-galaxy:mitre-enterprise-attack-attack-pattern="', 'misp-galaxy:mitre-pre-attack-attack-pattern="', 'misp-galaxy:mitre-enterprise-attack-attack-pattern="', 'misp-galaxy:mitre-mobile-attack-attack-pattern="'],
'ATT&CK Group': ['misp-galaxy:mitre-enterprise-intrusion-set="', 'misp-galaxy:mitre-pre-intrusion-set="', 'misp-galaxy:mitre-mobile-intrusion-set="', 'misp-galaxy:mitre-intrusion-set="', 'misp-galaxy:mitre-enterprise-attack-intrusion-set="', 'misp-galaxy:mitre-pre-attack-intrusion-set="', 'misp-galaxy:mitre-mobile-attack-intrusion-set="'],
'ATT&CK Software': ['misp-galaxy:mitre-enterprise-malware="', 'misp-galaxy:mitre-pre-malware="', 'misp-galaxy:mitre-mobile-malware="', 'misp-galaxy:mitre-malware="', 'misp-galaxy:mitre-enterprise-attack-tool="', 'misp-galaxy:mitre-enterprise-tool="', 'misp-galaxy:mitre-pre-tool="', 'misp-galaxy:mitre-mobile-tool="', 'misp-galaxy:mitre-tool="', 'misp-galaxy:mitre-enterprise-attack-malware="'],
'MISP Threat Actor': ['misp-galaxy:threat-actor="'],
'MISP Tags': ['circl:incident-classification="', 'osint:source-type="blog-post"', 'misp-galaxy:tool="', 'CERT-XLM:malicious-code="', 'circl:topic="', 'ddos:type="', 'ecsirt:fraud="', 'dnc:malware-type="', 'enisa:nefarious-activity-abuse="', 'europol-incident:information-gathering="', 'misp-galaxy:ransomware="', 'misp-galaxy:rat="', 'misp-galaxy:social-dark-patterns="', 'misp-galaxy:tool="', 'misp:threat-level="', 'ms-caro-malware:malware-platform=', 'ms-caro-malware:malware-type=', 'veris:security_incident="', 'veris:attribute:integrity:variety="', 'veris:actor:motive="', 'misp-galaxy:banker="', 'misp-galaxy:malpedia="', 'misp-galaxy:botnet="', 'malware_classification:malware-category="', 'TLP: white', 'TLP: Green',
'inthreat:event-src="feed-osint"', 'tlp:white', 'tlp:amber', 'tlp:green', 'tlp:red', 'osint:source-type="blog-post"', 'Partner Feed', 'IBM XForce', 'type:OSINT', 'malware:', 'osint:lifetime="perpetual"', 'Actor:', 'osint:certainty="50"', 'Banker:', 'Group:', 'Threat:',
'ncsc-nl-ndn:feed="selected"', 'misp-galaxy:microsoft-activity-group="', 'admiralty-scale:source-reliability="b"', 'admiralty-scale:source-reliability="a"', 'admiralty-scale:information-credibility="2"', 'admiralty-scale:information-credibility="3"',
'feed:source="CESICAT"', 'osint:source-type="automatic-analysis"', 'workflow:state="complete"', 'osint:source-type="technical-report"',
'csirt_case_classification:incident-category="', 'dnc:driveby-type="', 'veris:action:social:variety="', 'osint:source-type="',
'osint:source-type="microblog-post"', 'ecsirt:malicious-code="', 'misp-galaxy:sector="', 'veris:action:variety=', 'label=', 'csirt_case_classification:incident-category="', 'admiralty-scale:source-reliability="c"', 'workflow:todo="review"', 'LDO-CERT:detection="toSIEM"', 'Threat tlp:White', 'Threat Type:', 'adversary:infrastructure-state="active"', 'cirl:incident-classification:', 'misp-galaxy:android="', 'dnc:infrastructure-type="', 'ecsirt:information-gathering="', 'ecsirt:intrusions="', 'dhs-ciip-sectors:DHS-critical-sectors="', 'malware_classification:obfuscation-technique="no-obfuscation"',
'riskiq:threat-type="', 'veris:action:hacking:variety="', 'veris:action:social:target="', 'workflow:state="incomplete"', 'workflow:todo="add-tagging"', 'workflow:todo="add-context"', 'europol-incident:availability="', 'label=', 'misp-galaxy:stealer="', 'misp-galaxy:exploit-kit="', 'rsit:availability="', 'rsit:fraud="', 'ransomware:type="', 'veris:action:variety=', 'malware:',
'ecsirt:abusive-content="']}
if sanitize_selector == 'MISP Tags':
sanitize_set['MISP Tags'] = sanitize_set['MISP Tags'] + sanitize_set['ATT&CK Tactic'] + sanitize_set['ATT&CK Group'] + sanitize_set['ATT&CK Software'] + sanitize_set['MISP Threat Actor']
result_sanitize_set = {}
if dataset:
for element in dataset:
sanited = False
for sanitize_el in sanitize_set[sanitize_selector]:
if sanitize_el in element:
sanited = True
new_el = element.replace(sanitize_el, '').replace('"', '').strip()
if lower:
new_el = new_el.lower()
result_sanitize_set[new_el] = dataset[element]
if add_not_sanitized and not sanited:
new_el = element.strip()
if lower:
new_el = new_el.lower()
result_sanitize_set[new_el] = dataset[element]
return result_sanitize_set
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Generate a report of your MISP statistics.')