mirror of https://github.com/MISP/PyTaxonomies
302 lines
9.8 KiB
Python
302 lines
9.8 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import json
|
|
import os
|
|
import collections
|
|
import re
|
|
import sys
|
|
from json import JSONEncoder
|
|
|
|
try:
|
|
import requests
|
|
HAS_REQUESTS = True
|
|
except ImportError:
|
|
HAS_REQUESTS = False
|
|
|
|
try:
|
|
import jsonschema
|
|
HAS_JSONSCHEMA = True
|
|
except ImportError:
|
|
HAS_JSONSCHEMA = False
|
|
|
|
|
|
class EncodeTaxonomies(JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, (Taxonomy, Predicate, Entry)):
|
|
return obj.to_dict()
|
|
return JSONEncoder.default(self, obj)
|
|
|
|
|
|
class Entry():
|
|
|
|
def __init__(self, entry):
|
|
self.value = entry['value']
|
|
self.expanded = entry.get('expanded')
|
|
self.colour = entry.get('colour')
|
|
self.description = entry.get('description')
|
|
self.numerical_value = entry.get('numerical_value')
|
|
|
|
def to_dict(self):
|
|
to_return = {'value': self.value}
|
|
if self.expanded:
|
|
to_return['expanded'] = self.expanded
|
|
if self.colour:
|
|
to_return['colour'] = self.colour
|
|
if self.description:
|
|
to_return['description'] = self.description
|
|
if self.numerical_value is not None:
|
|
to_return['numerical_value'] = self.numerical_value
|
|
return to_return
|
|
|
|
def to_json(self):
|
|
return json.dumps(self, cls=EncodeTaxonomies)
|
|
|
|
def __str__(self):
|
|
return self.value
|
|
|
|
|
|
class Predicate(collections.Mapping):
|
|
|
|
def __init__(self, predicate, entries):
|
|
self.predicate = predicate['value']
|
|
self.expanded = predicate.get('expanded')
|
|
self.description = predicate.get('description')
|
|
self.colour = predicate.get('colour')
|
|
self.exclusive = predicate.get('exclusive')
|
|
self.__init_entries(entries)
|
|
|
|
def __init_entries(self, entries):
|
|
self.entries = {}
|
|
if entries:
|
|
for e in entries:
|
|
self.entries[e['value']] = Entry(e)
|
|
|
|
def to_dict(self):
|
|
to_return = {'value': self.predicate}
|
|
if self.expanded:
|
|
to_return['expanded'] = self.expanded
|
|
if self.description:
|
|
to_return['description'] = self.description
|
|
if self.colour:
|
|
to_return['colour'] = self.colour
|
|
if self.exclusive:
|
|
to_return['exclusive'] = self.exclusive
|
|
if self.entries:
|
|
to_return['entries'] = self.values()
|
|
return to_return
|
|
|
|
def to_json(self):
|
|
return json.dumps(self, cls=EncodeTaxonomies)
|
|
|
|
def __str__(self):
|
|
return self.predicate
|
|
|
|
def __getitem__(self, entry):
|
|
return self.entries[entry]
|
|
|
|
def __iter__(self):
|
|
return iter(self.entries)
|
|
|
|
def __len__(self):
|
|
return len(self.entries)
|
|
|
|
|
|
class Taxonomy(collections.Mapping):
|
|
|
|
def __init__(self, taxonomy):
|
|
self.taxonomy = taxonomy
|
|
self.name = self.taxonomy['namespace']
|
|
self.description = self.taxonomy['description']
|
|
self.version = self.taxonomy['version']
|
|
self.expanded = self.taxonomy.get('expanded')
|
|
self.refs = self.taxonomy.get('refs')
|
|
self.type = self.taxonomy.get('type')
|
|
self.exclusive = self.taxonomy.get('exclusive')
|
|
self.__init_predicates()
|
|
|
|
def __init_predicates(self):
|
|
self.predicates = {}
|
|
entries = {}
|
|
if self.taxonomy.get('values'):
|
|
for v in self.taxonomy['values']:
|
|
if not entries.get(v['predicate']):
|
|
entries[v['predicate']] = []
|
|
entries[v['predicate']] += v['entry']
|
|
for p in self.taxonomy['predicates']:
|
|
self.predicates[p['value']] = Predicate(p, entries.get(p['value']))
|
|
|
|
def to_json(self):
|
|
return json.dumps(self, cls=EncodeTaxonomies)
|
|
|
|
def to_dict(self):
|
|
to_return = {'namespace': self.name, 'description': self.description,
|
|
'version': self.version}
|
|
if self.expanded:
|
|
to_return['expanded'] = self.expanded
|
|
if self.refs:
|
|
to_return['refs'] = self.refs
|
|
if self.type:
|
|
to_return['type'] = self.type
|
|
if self.exclusive:
|
|
to_return['exclusive'] = self.exclusive
|
|
predicates = [p.to_dict() for p in self.values()]
|
|
entries = []
|
|
for p in predicates:
|
|
if p.get('entries') is None:
|
|
continue
|
|
entries.append({'predicate': p['value'], 'entry': [e.to_dict() for e in p.pop('entries')]})
|
|
to_return['predicates'] = predicates
|
|
if entries:
|
|
to_return['values'] = entries
|
|
return to_return
|
|
|
|
def has_entries(self):
|
|
if self.values():
|
|
for p in self.values():
|
|
if p.entries:
|
|
return True
|
|
return False
|
|
|
|
def __str__(self):
|
|
return '\n'.join(self.machinetags())
|
|
|
|
def make_machinetag(self, predicate, entry=None):
|
|
if entry:
|
|
return '{}:{}="{}"'.format(self.name, predicate, entry)
|
|
else:
|
|
return '{}:{}'.format(self.name, predicate)
|
|
|
|
def machinetags(self):
|
|
to_return = []
|
|
for p, content in self.items():
|
|
if content:
|
|
for k in content.keys():
|
|
to_return.append('{}:{}="{}"'.format(self.name, p, k))
|
|
else:
|
|
to_return.append('{}:{}'.format(self.name, p))
|
|
return to_return
|
|
|
|
def __getitem__(self, predicate):
|
|
return self.predicates[predicate]
|
|
|
|
def __iter__(self):
|
|
return iter(self.predicates)
|
|
|
|
def __len__(self):
|
|
return len(self.predicates)
|
|
|
|
def amount_entries(self):
|
|
if self.has_entries():
|
|
return sum([len(e) for e in self.values()])
|
|
else:
|
|
return len(self.keys())
|
|
|
|
def machinetags_expanded(self):
|
|
to_return = []
|
|
for p, content in self.items():
|
|
if content:
|
|
for k, entry in content.items():
|
|
to_return.append('{}:{}="{}"'.format(self.name, p, entry.expanded))
|
|
else:
|
|
to_return.append('{}:{}'.format(self.name, p))
|
|
return to_return
|
|
|
|
|
|
class Taxonomies(collections.Mapping):
|
|
|
|
def __init__(self, manifest_url='https://raw.githubusercontent.com/MISP/misp-taxonomies/master/MANIFEST.json',
|
|
manifest_path=os.path.join(os.path.abspath(os.path.dirname(sys.modules['pytaxonomies'].__file__)),
|
|
'data', 'misp-taxonomies', 'MANIFEST.json')):
|
|
if manifest_path:
|
|
self.loader = self.__load_path
|
|
self.manifest = self.loader(manifest_path)
|
|
else:
|
|
self.loader = self.__load_url
|
|
self.manifest = self.loader(manifest_url)
|
|
|
|
if manifest_path:
|
|
self.url = os.path.dirname(os.path.realpath(manifest_path))
|
|
else:
|
|
self.url = self.manifest['url']
|
|
self.version = self.manifest['version']
|
|
self.license = self.manifest['license']
|
|
self.description = self.manifest['description']
|
|
self.__init_taxonomies()
|
|
|
|
def validate_with_schema(self):
|
|
if not HAS_JSONSCHEMA:
|
|
raise ImportError('jsonschema is required: pip install jsonschema')
|
|
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pytaxonomies'].__file__)), 'data', 'misp-taxonomies', 'schema.json')
|
|
with open(schema, 'r') as f:
|
|
loaded_schema = json.load(f)
|
|
for t in self.values():
|
|
jsonschema.validate(t.taxonomy, loaded_schema)
|
|
|
|
def __load_path(self, path):
|
|
with open(path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
def __load_url(self, url):
|
|
if not HAS_REQUESTS:
|
|
raise Exception("Python module 'requests' isn't installed, unable to fetch the taxonomies.")
|
|
return requests.get(url).json()
|
|
|
|
def __make_uri(self, taxonomy_name):
|
|
return '{}/{}/{}'.format(self.url, taxonomy_name, self.manifest['path'])
|
|
|
|
def __init_taxonomies(self):
|
|
self.taxonomies = {}
|
|
for t in self.manifest['taxonomies']:
|
|
uri = self.__make_uri(t['name'])
|
|
tax = self.loader(uri)
|
|
self.taxonomies[t['name']] = Taxonomy(tax)
|
|
if t['name'] != self.taxonomies[t['name']].name:
|
|
raise Exception("The name of the taxonomy in the manifest ({}) doesn't match with the name in the taxonomy ({})".format(t['name'], self.taxonomies[t['name']].name))
|
|
|
|
def __getitem__(self, name):
|
|
return self.taxonomies[name]
|
|
|
|
def __iter__(self):
|
|
return iter(self.taxonomies)
|
|
|
|
def __len__(self):
|
|
return len(self.taxonomies)
|
|
|
|
def __str__(self):
|
|
to_print = ''
|
|
for taxonomy in self.values():
|
|
to_print += "{}\n\n".format(str(taxonomy))
|
|
return to_print
|
|
|
|
def search(self, query, expanded=False):
|
|
query = query.lower()
|
|
to_return = []
|
|
for taxonomy in self.values():
|
|
if expanded:
|
|
machinetags = taxonomy.machinetags_expanded()
|
|
else:
|
|
machinetags = taxonomy.machinetags()
|
|
for mt in machinetags:
|
|
entries = [e.lower() for e in re.findall('[^:="]*', mt) if e]
|
|
for e in entries:
|
|
if e.startswith(query) or e.endswith(query):
|
|
to_return.append(mt)
|
|
return to_return
|
|
|
|
def revert_machinetag(self, machinetag):
|
|
if '=' in machinetag:
|
|
name, predicat, entry = re.findall('^([^:]*):([^=]*)="([^"]*)"$', machinetag)[0]
|
|
else:
|
|
name, predicat = re.findall('^([^:]*):([^=]*)$', machinetag)[0]
|
|
entry = None
|
|
if entry:
|
|
return self.taxonomies[name], self.taxonomies[name][predicat], self.taxonomies[name][predicat][entry]
|
|
else:
|
|
return self.taxonomies[name], self.taxonomies[name][predicat]
|
|
|
|
def all_machinetags(self, expanded=False):
|
|
if expanded:
|
|
return [taxonomy.machinetags_expanded() for taxonomy in self.values()]
|
|
return [taxonomy.machinetags() for taxonomy in self.values()]
|