2018-10-24 21:30:30 +02:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
import json
|
|
|
|
from pytaxonomies import Taxonomy, Predicate, Entry
|
|
|
|
|
|
|
|
'''
|
|
|
|
Taxonomies mapping:
|
|
|
|
* disclosure-type, origin, originating-organization, exploitation-technique => part of cccs
|
|
|
|
* dos-type ~~ ddos
|
|
|
|
* report-state ~~ workflow
|
|
|
|
* malware-category ~~ malware_classification - NOPE: malware_classification has a static source @ SANS
|
|
|
|
|
|
|
|
* access-method - ack
|
|
|
|
* approved-category-of-action - ack
|
|
|
|
* cccs - ack
|
|
|
|
* interception method -> ack
|
|
|
|
|
|
|
|
* domain-category & ip-category - maybe?
|
|
|
|
* email-type - malicious email types - maybe?
|
|
|
|
* maliciousness -> maybe?
|
|
|
|
* malware-category -> yes?
|
|
|
|
* scan type: maybe?
|
|
|
|
|
|
|
|
* severity: atta&ck?
|
|
|
|
* misusage-type -> attack?
|
|
|
|
* mitigation type -> attack?
|
|
|
|
|
|
|
|
* threat-vector: languages/applications/protocols -> split
|
|
|
|
|
|
|
|
* ftp-type - request / response => object
|
|
|
|
* record type: (query/response) => part of object
|
|
|
|
|
|
|
|
* host category -> server/workstation -> part of object : network device
|
|
|
|
|
|
|
|
* method match -> HTTP request type ?!
|
|
|
|
'''
|
|
|
|
|
|
|
|
root_dir_taxonomies = Path('..')
|
|
|
|
|
|
|
|
ontology_path = Path('alfred-ontology.json')
|
|
|
|
|
|
|
|
with open(ontology_path) as f:
|
|
|
|
ontology = json.load(f)['data']
|
|
|
|
|
|
|
|
# CCCS Taxonomy
|
|
|
|
cccs = Taxonomy()
|
|
|
|
cccs.name = "cccs"
|
|
|
|
cccs.description = "Internal taxonomy for CCCS."
|
|
|
|
cccs.version = 1
|
|
|
|
cccs.expanded = "CCCS"
|
|
|
|
cccs.predicates = {}
|
|
|
|
|
|
|
|
# Tags for internal reference
|
|
|
|
predicate = Predicate()
|
|
|
|
predicate.predicate = 'event'
|
|
|
|
predicate.expanded = 'Event type'
|
|
|
|
predicate.description = 'Type of event associated to the internal reference'
|
|
|
|
predicate.entries = {}
|
|
|
|
|
|
|
|
for datatype in ontology['dataTypes']:
|
|
|
|
if 'superType' not in datatype or datatype['superType'] != 'EVENT':
|
|
|
|
continue
|
|
|
|
entry = Entry()
|
|
|
|
entry.value = datatype['name'].lower().replace('_', '-')
|
|
|
|
entry.expanded = datatype['name'].lower().replace('_', ' ').capitalize()
|
|
|
|
entry.description = datatype['description'].replace(' The value is the event ID.', '')
|
|
|
|
predicate.entries[entry.value] = entry
|
|
|
|
|
|
|
|
cccs.predicates[predicate.predicate] = predicate
|
|
|
|
|
2018-10-24 23:50:05 +02:00
|
|
|
predicate_of_cccs = ['disclosure-type', 'origin', 'originating-organization',
|
|
|
|
'exploitation-technique', 'domain-category', 'email-type',
|
|
|
|
'ip-category', 'maliciousness', 'malware-category', 'misusage-type',
|
|
|
|
'mitigation-type', 'scan-type', 'severity', 'threat-vector']
|
|
|
|
|
|
|
|
skip_for_now = []
|
|
|
|
|
|
|
|
ignore = ['dos-type', 'report-state', 'ftp-type', 'record-type', 'host-category',
|
|
|
|
'method-match']
|
2018-10-24 21:30:30 +02:00
|
|
|
|
|
|
|
for propertytype in ontology['propertyTypes']:
|
|
|
|
if 'accepts' in propertytype and propertytype['accepts']['name'] != 'list':
|
|
|
|
continue
|
|
|
|
misp_name = propertytype['name'].lower().replace('_', '-').replace(' ', '-')
|
|
|
|
if misp_name in ignore or misp_name in skip_for_now:
|
|
|
|
continue
|
|
|
|
if misp_name not in predicate_of_cccs:
|
|
|
|
new_taxonomy = Taxonomy()
|
|
|
|
new_taxonomy.name = misp_name
|
|
|
|
new_taxonomy.description = propertytype['description']
|
|
|
|
new_taxonomy.version = 1
|
|
|
|
new_taxonomy.expanded = propertytype['name'].lower().replace('_', ' ').capitalize()
|
|
|
|
new_taxonomy.predicates = {}
|
|
|
|
for value in propertytype['accepts']['values']:
|
|
|
|
predicate = Predicate()
|
|
|
|
predicate.predicate = value['name'].lower().replace('_', '-').replace(' ', '-')
|
|
|
|
predicate.expanded = value['name'].lower().replace('_', ' ').capitalize()
|
|
|
|
predicate.description = value['description']
|
|
|
|
new_taxonomy.predicates[predicate.predicate] = predicate
|
|
|
|
else:
|
|
|
|
predicate = Predicate()
|
|
|
|
predicate.predicate = misp_name
|
|
|
|
predicate.expanded = propertytype['name'].lower().replace('_', ' ').capitalize()
|
|
|
|
predicate.description = propertytype['description']
|
|
|
|
predicate.entries = {}
|
|
|
|
for value in propertytype['accepts']['values']:
|
|
|
|
entry = Entry()
|
|
|
|
entry.value = value['name'].lower().replace('_', '-').replace(' ', '-')
|
|
|
|
entry.expanded = value['name'].lower().replace('_', ' ').capitalize()
|
|
|
|
entry.description = value['description']
|
|
|
|
predicate.entries[entry.value] = entry
|
|
|
|
cccs.predicates[predicate.predicate] = predicate
|
|
|
|
|
|
|
|
if not (root_dir_taxonomies / new_taxonomy.name).exists():
|
|
|
|
(root_dir_taxonomies / new_taxonomy.name).mkdir()
|
|
|
|
|
|
|
|
if (root_dir_taxonomies / new_taxonomy.name / 'machinetag.json').exists():
|
|
|
|
with open(root_dir_taxonomies / new_taxonomy.name / 'machinetag.json') as f:
|
|
|
|
existing_taxonomy = json.load(f)
|
|
|
|
if existing_taxonomy == new_taxonomy.to_dict():
|
|
|
|
continue
|
|
|
|
new_taxonomy.version = existing_taxonomy['version'] + 1
|
|
|
|
with open(root_dir_taxonomies / new_taxonomy.name / 'machinetag.json', 'w') as f:
|
|
|
|
json.dump(new_taxonomy.to_dict(), f, indent=2)
|
|
|
|
|
|
|
|
|
|
|
|
# Dump generic CCCS taxonomy
|
|
|
|
|
|
|
|
if not (root_dir_taxonomies / cccs.name).exists():
|
|
|
|
(root_dir_taxonomies / cccs.name).mkdir()
|
|
|
|
|
|
|
|
if (root_dir_taxonomies / cccs.name / 'machinetag.json').exists():
|
|
|
|
with open(root_dir_taxonomies / cccs.name / 'machinetag.json') as f:
|
|
|
|
existing_taxonomy = json.load(f)
|
|
|
|
if existing_taxonomy != cccs.to_dict():
|
|
|
|
cccs.version = existing_taxonomy['version'] + 1
|
|
|
|
|
|
|
|
with open(root_dir_taxonomies / cccs.name / 'machinetag.json', 'w') as f:
|
|
|
|
json.dump(cccs.to_dict(), f, indent=2)
|