Minor style changes.

Removed OrderedDict and update()... Also a lot of single quoting except for errors
stix2.1
Emmanuelle Vargas-Gonzalez 2018-06-29 18:38:04 -04:00
parent 9cc74e88b6
commit 7fd379d0b5
25 changed files with 511 additions and 638 deletions

View File

@ -11,7 +11,7 @@ VERSION_FILE = os.path.join(BASE_DIR, 'stix2', 'version.py')
def get_version():
with open(VERSION_FILE) as f:
for line in f.readlines():
if line.startswith("__version__"):
if line.startswith('__version__'):
version = line.split()[-1].strip('"')
return version
raise AttributeError("Package does not have a __version__")

View File

@ -70,4 +70,4 @@ from .version import __version__
_collect_stix2_obj_maps()
DEFAULT_VERSION = "2.1" # Default version will always be the latest STIX 2.X version
DEFAULT_VERSION = '2.1' # Default version will always be the latest STIX 2.X version

View File

@ -104,11 +104,11 @@ class _STIXBase(collections.Mapping):
def _check_at_least_one_property(self, list_of_properties=None):
if not list_of_properties:
list_of_properties = sorted(list(self.__class__._properties.keys()))
if "type" in list_of_properties:
list_of_properties.remove("type")
if 'type' in list_of_properties:
list_of_properties.remove('type')
current_properties = self.properties_populated()
list_of_properties_populated = set(list_of_properties).intersection(current_properties)
if list_of_properties and (not list_of_properties_populated or list_of_properties_populated == set(["extensions"])):
if list_of_properties and (not list_of_properties_populated or list_of_properties_populated == set(['extensions'])):
raise AtLeastOnePropertyError(self.__class__, list_of_properties)
def _check_properties_dependency(self, list_of_properties, list_of_dependent_properties):
@ -121,8 +121,8 @@ class _STIXBase(collections.Mapping):
raise DependentPropertiesError(self.__class__, failed_dependency_pairs)
def _check_object_constraints(self):
for m in self.get("granular_markings", []):
validate(self, m.get("selectors"))
for m in self.get('granular_markings', []):
validate(self, m.get('selectors'))
def __init__(self, allow_custom=False, **kwargs):
cls = self.__class__
@ -190,7 +190,7 @@ class _STIXBase(collections.Mapping):
# usual behavior of this method reads an __init__-assigned attribute,
# which would cause infinite recursion. So this check disables all
# attribute reads until the instance has been properly initialized.
unpickling = "_inner" not in self.__dict__
unpickling = '_inner' not in self.__dict__
if not unpickling and name in self:
return self.__getitem__(name)
raise AttributeError("'%s' object has no attribute '%s'" %
@ -206,8 +206,8 @@ class _STIXBase(collections.Mapping):
def __repr__(self):
props = [(k, self[k]) for k in self.object_properties() if self.get(k)]
return "{0}({1})".format(self.__class__.__name__,
", ".join(["{0!s}={1!r}".format(k, v) for k, v in props]))
return '{0}({1})'.format(self.__class__.__name__,
', '.join(['{0!s}={1!r}'.format(k, v) for k, v in props]))
def __deepcopy__(self, memo):
# Assume: we can ignore the memo argument, because no object will ever contain the same sub-object multiple times.
@ -273,7 +273,7 @@ class _STIXBase(collections.Mapping):
def sort_by(element):
return find_property_index(self, *element)
kwargs.update({'indent': 4, 'separators': (",", ": "), 'item_sort_key': sort_by})
kwargs.update({'indent': 4, 'separators': (',', ': '), 'item_sort_key': sort_by})
if include_optional_defaults:
return json.dumps(self, cls=STIXJSONIncludeOptionalDefaultsEncoder, **kwargs)

View File

@ -31,13 +31,13 @@ def none_low_med_high_to_value(scale_value):
Raises:
ValueError: If `scale_value` is not within the accepted strings.
"""
if scale_value == "None":
if scale_value == 'None':
return 0
elif scale_value == "Low":
elif scale_value == 'Low':
return 15
elif scale_value == "Med":
elif scale_value == 'Med':
return 50
elif scale_value == "High":
elif scale_value == 'High':
return 85
else:
raise ValueError("STIX Confidence value cannot be determined for %s" % scale_value)
@ -69,13 +69,13 @@ def value_to_none_low_medium_high(confidence_value):
"""
if confidence_value == 0:
return "None"
return 'None'
elif 29 >= confidence_value >= 1:
return "Low"
return 'Low'
elif 69 >= confidence_value >= 30:
return "Med"
return 'Med'
elif 100 >= confidence_value >= 70:
return "High"
return 'High'
else:
raise ValueError("Range of values out of bounds: %s" % confidence_value)
@ -114,27 +114,27 @@ def zero_ten_to_value(scale_value):
ValueError: If `scale_value` is not within the accepted strings.
"""
if scale_value == "0":
if scale_value == '0':
return 0
elif scale_value == "1":
elif scale_value == '1':
return 10
elif scale_value == "2":
elif scale_value == '2':
return 20
elif scale_value == "3":
elif scale_value == '3':
return 30
elif scale_value == "4":
elif scale_value == '4':
return 40
elif scale_value == "5":
elif scale_value == '5':
return 50
elif scale_value == "6":
elif scale_value == '6':
return 60
elif scale_value == "7":
elif scale_value == '7':
return 70
elif scale_value == "8":
elif scale_value == '8':
return 80
elif scale_value == "9":
elif scale_value == '9':
return 90
elif scale_value == "10":
elif scale_value == '10':
return 100
else:
raise ValueError("STIX Confidence value cannot be determined for %s" % scale_value)
@ -173,27 +173,27 @@ def value_to_zero_ten(confidence_value):
"""
if 4 >= confidence_value >= 0:
return "0"
return '0'
elif 14 >= confidence_value >= 5:
return "1"
return '1'
elif 24 >= confidence_value >= 15:
return "2"
return '2'
elif 34 >= confidence_value >= 25:
return "3"
return '3'
elif 44 >= confidence_value >= 35:
return "4"
return '4'
elif 54 >= confidence_value >= 45:
return "5"
return '5'
elif 64 >= confidence_value >= 55:
return "6"
return '6'
elif 74 >= confidence_value >= 65:
return "7"
return '7'
elif 84 >= confidence_value >= 75:
return "8"
return '8'
elif 94 >= confidence_value >= 85:
return "9"
return '9'
elif 100 >= confidence_value >= 95:
return "10"
return '10'
else:
raise ValueError("Range of values out of bounds: %s" % confidence_value)
@ -229,17 +229,17 @@ def admiralty_credibility_to_value(scale_value):
ValueError: If `scale_value` is not within the accepted strings.
"""
if scale_value == "6 - Truth cannot be judged":
raise ValueError("STIX Confidence value cannot be determined for %s" % scale_value) # TODO: What happens here?
elif scale_value == "5 - Improbable":
if scale_value == '6 - Truth cannot be judged':
raise ValueError("STIX Confidence value cannot be determined for %s" % scale_value)
elif scale_value == '5 - Improbable':
return 10
elif scale_value == "4 - Doubtful":
elif scale_value == '4 - Doubtful':
return 30
elif scale_value == "3 - Possibly True":
elif scale_value == '3 - Possibly True':
return 50
elif scale_value == "2 - Probably True":
elif scale_value == '2 - Probably True':
return 70
elif scale_value == "1 - Confirmed by other sources":
elif scale_value == '1 - Confirmed by other sources':
return 90
else:
raise ValueError("STIX Confidence value cannot be determined for %s" % scale_value)
@ -272,17 +272,16 @@ def value_to_admiralty_credibility(confidence_value):
ValueError: If `confidence_value` is out of bounds.
"""
# TODO: Case "6 - Truth cannot be judged"
if 19 >= confidence_value >= 0:
return "5 - Improbable"
return '5 - Improbable'
elif 39 >= confidence_value >= 20:
return "4 - Doubtful"
return '4 - Doubtful'
elif 59 >= confidence_value >= 40:
return "3 - Possibly True"
return '3 - Possibly True'
elif 79 >= confidence_value >= 60:
return "2 - Probably True"
return '2 - Probably True'
elif 100 >= confidence_value >= 80:
return "1 - Confirmed by other sources"
return '1 - Confirmed by other sources'
else:
raise ValueError("Range of values out of bounds: %s" % confidence_value)
@ -320,19 +319,19 @@ def wep_to_value(scale_value):
ValueError: If `scale_value` is not within the accepted strings.
"""
if scale_value == "Impossible":
if scale_value == 'Impossible':
return 0
elif scale_value == "Highly Unlikely/Almost Certainly Not":
elif scale_value == 'Highly Unlikely/Almost Certainly Not':
return 10
elif scale_value == "Unlikely/Probably Not":
elif scale_value == 'Unlikely/Probably Not':
return 30
elif scale_value == "Even Chance":
elif scale_value == 'Even Chance':
return 50
elif scale_value == "Likely/Probable":
elif scale_value == 'Likely/Probable':
return 70
elif scale_value == "Highly likely/Almost Certain":
elif scale_value == 'Highly likely/Almost Certain':
return 90
elif scale_value == "Certain":
elif scale_value == 'Certain':
return 100
else:
raise ValueError("STIX Confidence value cannot be determined for %s" % scale_value)
@ -367,19 +366,19 @@ def value_to_wep(confidence_value):
"""
if confidence_value == 0:
return "Impossible"
return 'Impossible'
elif 19 >= confidence_value >= 1:
return "Highly Unlikely/Almost Certainly Not"
return 'Highly Unlikely/Almost Certainly Not'
elif 39 >= confidence_value >= 20:
return "Unlikely/Probably Not"
return 'Unlikely/Probably Not'
elif 59 >= confidence_value >= 40:
return "Even Chance"
return 'Even Chance'
elif 79 >= confidence_value >= 60:
return "Likely/Probable"
return 'Likely/Probable'
elif 99 >= confidence_value >= 80:
return "Highly likely/Almost Certain"
return 'Highly likely/Almost Certain'
elif confidence_value == 100:
return "Certain"
return 'Certain'
else:
raise ValueError("Range of values out of bounds: %s" % confidence_value)
@ -417,19 +416,19 @@ def dni_to_value(scale_value):
ValueError: If `scale_value` is not within the accepted strings.
"""
if scale_value == "Almost No Chance / Remote":
if scale_value == 'Almost No Chance / Remote':
return 5
elif scale_value == "Very Unlikely / Highly Improbable":
elif scale_value == 'Very Unlikely / Highly Improbable':
return 15
elif scale_value == "Unlikely / Improbable":
elif scale_value == 'Unlikely / Improbable':
return 30
elif scale_value == "Roughly Even Change / Roughly Even Odds":
elif scale_value == 'Roughly Even Change / Roughly Even Odds':
return 50
elif scale_value == "Likely / Probable":
elif scale_value == 'Likely / Probable':
return 70
elif scale_value == "Very Likely / Highly Probable":
elif scale_value == 'Very Likely / Highly Probable':
return 85
elif scale_value == "Almost Certain / Nearly Certain":
elif scale_value == 'Almost Certain / Nearly Certain':
return 95
else:
raise ValueError("STIX Confidence value cannot be determined for %s" % scale_value)
@ -464,18 +463,18 @@ def value_to_dni(confidence_value):
"""
if 9 >= confidence_value >= 0:
return "Almost No Chance / Remote"
return 'Almost No Chance / Remote'
elif 19 >= confidence_value >= 10:
return "Very Unlikely / Highly Improbable"
return 'Very Unlikely / Highly Improbable'
elif 39 >= confidence_value >= 20:
return "Unlikely / Improbable"
return 'Unlikely / Improbable'
elif 59 >= confidence_value >= 40:
return "Roughly Even Change / Roughly Even Odds"
return 'Roughly Even Change / Roughly Even Odds'
elif 79 >= confidence_value >= 60:
return "Likely / Probable"
return 'Likely / Probable'
elif 89 >= confidence_value >= 80:
return "Very Likely / Highly Probable"
return 'Very Likely / Highly Probable'
elif 100 >= confidence_value >= 90:
return "Almost Certain / Nearly Certain"
return 'Almost Certain / Nearly Certain'
else:
raise ValueError("Range of values out of bounds: %s" % confidence_value)

View File

@ -76,11 +76,11 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
if 'type' not in stix_dict:
raise ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict))
if "spec_version" in stix_dict:
if 'spec_version' in stix_dict:
# For STIX 2.0, applies to bundles only.
# For STIX 2.1+, applies to SDOs, SROs, and markings only.
v = 'v' + stix_dict["spec_version"].replace('.', '')
elif stix_dict["type"] == "bundle":
v = 'v' + stix_dict['spec_version'].replace('.', '')
elif stix_dict['type'] == 'bundle':
# bundles without spec_version are ambiguous.
if version:
v = 'v' + version.replace('.', '')

View File

@ -83,7 +83,8 @@ class DataStoreMixin(object):
try:
return self.source.get(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
msg = "%s has no data source to query"
raise AttributeError(msg % self.__class__.__name__)
def all_versions(self, *args, **kwargs):
"""Retrieve all versions of a single STIX object by ID.
@ -100,7 +101,8 @@ class DataStoreMixin(object):
try:
return self.source.all_versions(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
msg = "%s has no data source to query"
raise AttributeError(msg % self.__class__.__name__)
def query(self, *args, **kwargs):
"""Retrieve STIX objects matching a set of filters.
@ -118,7 +120,8 @@ class DataStoreMixin(object):
try:
return self.source.query(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
msg = "%s has no data source to query"
raise AttributeError(msg % self.__class__.__name__)
def creator_of(self, *args, **kwargs):
"""Retrieve the Identity refered to by the object's `created_by_ref`.
@ -137,7 +140,8 @@ class DataStoreMixin(object):
try:
return self.source.creator_of(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
msg = "%s has no data source to query"
raise AttributeError(msg % self.__class__.__name__)
def relationships(self, *args, **kwargs):
"""Retrieve Relationships involving the given STIX object.
@ -163,7 +167,8 @@ class DataStoreMixin(object):
try:
return self.source.relationships(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
msg = "%s has no data source to query"
raise AttributeError(msg % self.__class__.__name__)
def related_to(self, *args, **kwargs):
"""Retrieve STIX Objects that have a Relationship involving the given
@ -193,7 +198,8 @@ class DataStoreMixin(object):
try:
return self.source.related_to(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
msg = "%s has no data source to query"
raise AttributeError(msg % self.__class__.__name__)
def add(self, *args, **kwargs):
"""Method for storing STIX objects.
@ -208,7 +214,8 @@ class DataStoreMixin(object):
try:
return self.sink.add(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data sink to put objects in' % self.__class__.__name__)
msg = "%s has no data sink to put objects in"
raise AttributeError(msg % self.__class__.__name__)
class DataSink(with_metaclass(ABCMeta)):
@ -457,7 +464,7 @@ class CompositeDataSource(DataSource):
"""
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
raise AttributeError("CompositeDataSource has no data sources")
all_data = []
all_filters = FilterSet()
@ -504,7 +511,7 @@ class CompositeDataSource(DataSource):
"""
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
raise AttributeError("CompositeDataSource has no data sources")
all_data = []
all_filters = FilterSet()
@ -543,7 +550,7 @@ class CompositeDataSource(DataSource):
"""
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
raise AttributeError("CompositeDataSource has no data sources")
if not query:
# don't mess with the query (i.e. deduplicate, as that's done
@ -594,7 +601,7 @@ class CompositeDataSource(DataSource):
"""
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
raise AttributeError("CompositeDataSource has no data sources")
results = []
for ds in self.data_sources:
@ -634,7 +641,7 @@ class CompositeDataSource(DataSource):
"""
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
raise AttributeError("CompositeDataSource has no data sources")
results = []
for ds in self.data_sources:

View File

@ -1,7 +1,4 @@
"""
Python STIX 2.0 FileSystem Source/Sink
"""
"""Python STIX 2.0 FileSystem Source/Sink"""
import json
import os
@ -78,7 +75,7 @@ class FileSystemSink(DataSink):
def _check_path_and_write(self, stix_obj):
"""Write the given STIX object to a file in the STIX file directory.
"""
path = os.path.join(self._stix_dir, stix_obj["type"], stix_obj["id"] + ".json")
path = os.path.join(self._stix_dir, stix_obj['type'], stix_obj['id'] + '.json')
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
@ -86,7 +83,7 @@ class FileSystemSink(DataSink):
if self.bundlify:
stix_obj = Bundle(stix_obj, allow_custom=self.allow_custom)
with open(path, "w") as f:
with open(path, 'w') as f:
f.write(str(stix_obj))
def add(self, stix_data=None, version=None):
@ -112,9 +109,9 @@ class FileSystemSink(DataSink):
elif isinstance(stix_data, (str, dict)):
stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
if stix_data["type"] == "bundle":
if stix_data['type'] == 'bundle':
# extract STIX objects
for stix_obj in stix_data.get("objects", []):
for stix_obj in stix_data.get('objects', []):
self.add(stix_obj, version=version)
else:
# adding json-formatted STIX
@ -122,7 +119,7 @@ class FileSystemSink(DataSink):
elif isinstance(stix_data, Bundle):
# recursively add individual STIX objects
for stix_obj in stix_data.get("objects", []):
for stix_obj in stix_data.get('objects', []):
self.add(stix_obj, version=version)
elif isinstance(stix_data, list):
@ -177,7 +174,7 @@ class FileSystemSource(DataSource):
a python STIX object and then returned
"""
query = [Filter("id", "=", stix_id)]
query = [Filter('id', '=', stix_id)]
all_data = self.query(query=query, version=version, _composite_filters=_composite_filters)
@ -252,12 +249,12 @@ class FileSystemSource(DataSource):
# the corresponding subdirectories as well
include_paths = []
declude_paths = []
if "type" in [filter.property for filter in file_filters]:
if 'type' in [filter.property for filter in file_filters]:
for filter in file_filters:
if filter.property == "type":
if filter.op == "=":
if filter.property == 'type':
if filter.op == '=':
include_paths.append(os.path.join(self._stix_dir, filter.value))
elif filter.op == "!=":
elif filter.op == '!=':
declude_paths.append(os.path.join(self._stix_dir, filter.value))
else:
# have to walk entire STIX directory
@ -281,9 +278,9 @@ class FileSystemSource(DataSource):
# grab stix object ID as well - if present in filters, as
# may forgo the loading of STIX content into memory
if "id" in [filter.property for filter in file_filters]:
if 'id' in [filter.property for filter in file_filters]:
for filter in file_filters:
if filter.property == "id" and filter.op == "=":
if filter.property == 'id' and filter.op == '=':
id_ = filter.value
break
else:
@ -295,21 +292,21 @@ class FileSystemSource(DataSource):
for path in include_paths:
for root, dirs, files in os.walk(path):
for file_ in files:
if not file_.endswith(".json"):
if not file_.endswith('.json'):
# skip non '.json' files as more likely to be random non-STIX files
continue
if not id_ or id_ == file_.split(".")[0]:
if not id_ or id_ == file_.split('.')[0]:
# have to load into memory regardless to evaluate other filters
try:
stix_obj = json.load(open(os.path.join(root, file_)))
if stix_obj["type"] == "bundle":
stix_obj = stix_obj["objects"][0]
if stix_obj['type'] == 'bundle':
stix_obj = stix_obj['objects'][0]
# naive STIX type checking
stix_obj["type"]
stix_obj["id"]
stix_obj['type']
stix_obj['id']
except (ValueError, KeyError): # likely not a JSON file
raise TypeError("STIX JSON object at '{0}' could either not be parsed to "
@ -339,6 +336,6 @@ class FileSystemSource(DataSource):
"""
file_filters = []
for filter_ in query:
if filter_.property == "id" or filter_.property == "type":
if filter_.property == 'id' or filter_.property == 'type':
file_filters.append(filter_)
return file_filters

View File

@ -1,7 +1,4 @@
"""
Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
"""
"""Filters for Python STIX 2.0 DataSources, DataSinks, DataStores"""
import collections
from datetime import datetime
@ -40,14 +37,14 @@ def _check_filter_components(prop, op, value):
# check filter value type is supported
raise TypeError("Filter value of '%s' is not supported. The type must be a Python immutable type or dictionary" % type(value))
if prop == "type" and "_" in value:
if prop == 'type' and '_' in value:
# check filter where the property is type, value (type name) cannot have underscores
raise ValueError("Filter for property 'type' cannot have its value '%s' include underscores" % value)
return True
class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
class Filter(collections.namedtuple('Filter', ['property', 'op', 'value'])):
"""STIX 2 filters that support the querying functionality of STIX 2
DataStores and DataSources.
@ -94,19 +91,19 @@ class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
# but will put here for now
stix_obj_property = format_datetime(stix_obj_property)
if self.op == "=":
if self.op == '=':
return stix_obj_property == self.value
elif self.op == "!=":
elif self.op == '!=':
return stix_obj_property != self.value
elif self.op == "in":
elif self.op == 'in':
return stix_obj_property in self.value
elif self.op == ">":
elif self.op == '>':
return stix_obj_property > self.value
elif self.op == "<":
elif self.op == '<':
return stix_obj_property < self.value
elif self.op == ">=":
elif self.op == '>=':
return stix_obj_property >= self.value
elif self.op == "<=":
elif self.op == '<=':
return stix_obj_property <= self.value
else:
raise ValueError("Filter operator: {0} not supported for specified property: {1}".format(self.op, self.property))
@ -153,7 +150,7 @@ def _check_filter(filter_, stix_obj):
"""
# For properties like granular_markings and external_references
# need to extract the first property from the string.
prop = filter_.property.split(".")[0]
prop = filter_.property.split('.')[0]
if prop not in stix_obj.keys():
# check filter "property" is in STIX object - if cant be
@ -161,9 +158,9 @@ def _check_filter(filter_, stix_obj):
# (i.e. did not make it through the filter)
return False
if "." in filter_.property:
if '.' in filter_.property:
# Check embedded properties, from e.g. granular_markings or external_references
sub_property = filter_.property.split(".", 1)[1]
sub_property = filter_.property.split('.', 1)[1]
sub_filter = filter_._replace(property=sub_property)
if isinstance(stix_obj[prop], list):

View File

@ -36,16 +36,16 @@ def _add(store, stix_data=None, version=None):
"""
if isinstance(stix_data, _STIXBase):
# adding a python STIX object
store._data[stix_data["id"]] = stix_data
store._data[stix_data['id']] = stix_data
elif isinstance(stix_data, dict):
if stix_data["type"] == "bundle":
if stix_data['type'] == 'bundle':
# adding a json bundle - so just grab STIX objects
for stix_obj in stix_data.get("objects", []):
for stix_obj in stix_data.get('objects', []):
_add(store, stix_obj, version=version)
else:
# adding a json STIX object
store._data[stix_data["id"]] = stix_data
store._data[stix_data['id']] = stix_data
elif isinstance(stix_data, list):
# STIX objects are in a list- recurse on each object
@ -156,7 +156,7 @@ class MemorySink(DataSink):
if not os.path.exists(os.path.dirname(file_path)):
os.makedirs(os.path.dirname(file_path))
with open(file_path, "w") as f:
with open(file_path, 'w') as f:
f.write(str(Bundle(list(self._data.values()), allow_custom=self.allow_custom)))
save_to_file.__doc__ = MemoryStore.save_to_file.__doc__
@ -217,7 +217,7 @@ class MemorySource(DataSource):
return stix_obj
# if there are filters from the composite level, process full query
query = [Filter("id", "=", stix_id)]
query = [Filter('id', '=', stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters)
@ -283,10 +283,10 @@ class MemorySource(DataSource):
return all_data
def load_from_file(self, file_path, version=None):
stix_data = json.load(open(os.path.abspath(file_path), "r"))
stix_data = json.load(open(os.path.abspath(file_path), 'r'))
if stix_data["type"] == "bundle":
for stix_obj in stix_data["objects"]:
if stix_data['type'] == 'bundle':
for stix_obj in stix_data['objects']:
_add(self, stix_data=parse(stix_obj, allow_custom=self.allow_custom))
else:
_add(self, stix_data=parse(stix_data, allow_custom=self.allow_custom, version=version))

View File

@ -1,6 +1,4 @@
"""
Python STIX 2.x TAXIICollectionStore
"""
"""Python STIX 2.x TAXIICollectionStore"""
from requests.exceptions import HTTPError
from stix2 import Bundle
@ -89,17 +87,17 @@ class TAXIICollectionSink(DataSink):
"""
if isinstance(stix_data, _STIXBase):
# adding python STIX object
if stix_data["type"] == "bundle":
bundle = stix_data.serialize(encoding="utf-8")
if stix_data['type'] == 'bundle':
bundle = stix_data.serialize(encoding='utf-8')
else:
bundle = Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding="utf-8")
bundle = Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8')
elif isinstance(stix_data, dict):
# adding python dict (of either Bundle or STIX obj)
if stix_data["type"] == "bundle":
bundle = parse(stix_data, allow_custom=self.allow_custom, version=version).serialize(encoding="utf-8")
if stix_data['type'] == 'bundle':
bundle = parse(stix_data, allow_custom=self.allow_custom, version=version).serialize(encoding='utf-8')
else:
bundle = Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding="utf-8")
bundle = Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8')
elif isinstance(stix_data, list):
# adding list of something - recurse on each
@ -110,10 +108,10 @@ class TAXIICollectionSink(DataSink):
elif isinstance(stix_data, str):
# adding json encoded string of STIX content
stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
if stix_data["type"] == "bundle":
bundle = stix_data.serialize(encoding="utf-8")
if stix_data['type'] == 'bundle':
bundle = stix_data.serialize(encoding='utf-8')
else:
bundle = Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding="utf-8")
bundle = Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8')
else:
raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")
@ -177,7 +175,7 @@ class TAXIICollectionSource(DataSource):
# dont extract TAXII filters from query (to send to TAXII endpoint)
# as directly retrieveing a STIX object by ID
try:
stix_objs = self.collection.get_object(stix_id)["objects"]
stix_objs = self.collection.get_object(stix_id)['objects']
stix_obj = list(apply_common_filters(stix_objs, query))
except HTTPError as e:
@ -214,8 +212,8 @@ class TAXIICollectionSource(DataSource):
"""
# make query in TAXII query format since 'id' is TAXII field
query = [
Filter("id", "=", stix_id),
Filter("version", "=", "all")
Filter('id', '=', stix_id),
Filter('version', '=', 'all')
]
all_data = self.query(query=query, _composite_filters=_composite_filters)
@ -264,7 +262,7 @@ class TAXIICollectionSource(DataSource):
# query TAXII collection
try:
all_data = self.collection.get_objects(**taxii_filters_dict)["objects"]
all_data = self.collection.get_objects(**taxii_filters_dict)['objects']
# deduplicate data (before filtering as reduces wasted filtering)
all_data = deduplicate(all_data)

View File

@ -29,7 +29,7 @@ def get_markings(obj, selectors, inherited=False, descendants=False):
selectors = utils.convert_to_list(selectors)
utils.validate(obj, selectors)
granular_markings = obj.get("granular_markings", [])
granular_markings = obj.get('granular_markings', [])
if not granular_markings:
return []
@ -38,11 +38,11 @@ def get_markings(obj, selectors, inherited=False, descendants=False):
for marking in granular_markings:
for user_selector in selectors:
for marking_selector in marking.get("selectors", []):
for marking_selector in marking.get('selectors', []):
if any([(user_selector == marking_selector), # Catch explicit selectors.
(user_selector.startswith(marking_selector) and inherited), # Catch inherited selectors.
(marking_selector.startswith(user_selector) and descendants)]): # Catch descendants selectors
refs = marking.get("marking_ref", [])
refs = marking.get('marking_ref', [])
results.update([refs])
return list(results)
@ -93,7 +93,7 @@ def remove_markings(obj, marking, selectors):
marking = utils.convert_to_marking_list(marking)
utils.validate(obj, selectors)
granular_markings = obj.get("granular_markings")
granular_markings = obj.get('granular_markings')
if not granular_markings:
return obj
@ -102,9 +102,9 @@ def remove_markings(obj, marking, selectors):
to_remove = []
for m in marking:
to_remove.append({"marking_ref": m, "selectors": selectors})
to_remove.append({'marking_ref': m, 'selectors': selectors})
remove = utils.build_granular_marking(to_remove).get("granular_markings")
remove = utils.build_granular_marking(to_remove).get('granular_markings')
if not any(marking in granular_markings for marking in remove):
raise exceptions.MarkingNotFoundError(obj, remove)
@ -145,10 +145,10 @@ def add_markings(obj, marking, selectors):
granular_marking = []
for m in marking:
granular_marking.append({"marking_ref": m, "selectors": sorted(selectors)})
granular_marking.append({'marking_ref': m, 'selectors': sorted(selectors)})
if obj.get("granular_markings"):
granular_marking.extend(obj.get("granular_markings"))
if obj.get('granular_markings'):
granular_marking.extend(obj.get('granular_markings'))
granular_marking = utils.expand_markings(granular_marking)
granular_marking = utils.compress_markings(granular_marking)
@ -176,7 +176,7 @@ def clear_markings(obj, selectors):
selectors = utils.convert_to_list(selectors)
utils.validate(obj, selectors)
granular_markings = obj.get("granular_markings")
granular_markings = obj.get('granular_markings')
if not granular_markings:
return obj
@ -184,25 +184,25 @@ def clear_markings(obj, selectors):
granular_markings = utils.expand_markings(granular_markings)
sdo = utils.build_granular_marking(
[{"selectors": selectors, "marking_ref": "N/A"}]
[{'selectors': selectors, 'marking_ref': 'N/A'}]
)
clear = sdo.get("granular_markings", [])
clear = sdo.get('granular_markings', [])
if not any(clear_selector in sdo_selectors.get("selectors", [])
if not any(clear_selector in sdo_selectors.get('selectors', [])
for sdo_selectors in granular_markings
for clear_marking in clear
for clear_selector in clear_marking.get("selectors", [])
for clear_selector in clear_marking.get('selectors', [])
):
raise exceptions.MarkingNotFoundError(obj, clear)
for granular_marking in granular_markings:
for s in selectors:
if s in granular_marking.get("selectors", []):
marking_refs = granular_marking.get("marking_ref")
if s in granular_marking.get('selectors', []):
marking_refs = granular_marking.get('marking_ref')
if marking_refs:
granular_marking["marking_ref"] = ""
granular_marking['marking_ref'] = ''
granular_markings = utils.compress_markings(granular_markings)
@ -245,19 +245,19 @@ def is_marked(obj, marking=None, selectors=None, inherited=False, descendants=Fa
marking = utils.convert_to_marking_list(marking)
utils.validate(obj, selectors)
granular_markings = obj.get("granular_markings", [])
granular_markings = obj.get('granular_markings', [])
marked = False
markings = set()
for granular_marking in granular_markings:
for user_selector in selectors:
for marking_selector in granular_marking.get("selectors", []):
for marking_selector in granular_marking.get('selectors', []):
if any([(user_selector == marking_selector), # Catch explicit selectors.
(user_selector.startswith(marking_selector) and inherited), # Catch inherited selectors.
(marking_selector.startswith(user_selector) and descendants)]): # Catch descendants selectors
marking_ref = granular_marking.get("marking_ref", "")
marking_ref = granular_marking.get('marking_ref', '')
if marking and any(x == marking_ref for x in marking):
markings.update([marking_ref])

View File

@ -18,7 +18,7 @@ def get_markings(obj):
markings are present in `object_marking_refs`.
"""
return obj.get("object_marking_refs", [])
return obj.get('object_marking_refs', [])
def add_markings(obj, marking):
@ -35,7 +35,7 @@ def add_markings(obj, marking):
"""
marking = utils.convert_to_marking_list(marking)
object_markings = set(obj.get("object_marking_refs", []) + marking)
object_markings = set(obj.get('object_marking_refs', []) + marking)
return new_version(obj, object_marking_refs=list(object_markings), allow_custom=True)
@ -59,12 +59,12 @@ def remove_markings(obj, marking):
"""
marking = utils.convert_to_marking_list(marking)
object_markings = obj.get("object_marking_refs", [])
object_markings = obj.get('object_marking_refs', [])
if not object_markings:
return obj
if any(x not in obj["object_marking_refs"] for x in marking):
if any(x not in obj['object_marking_refs'] for x in marking):
raise exceptions.MarkingNotFoundError(obj, marking)
new_markings = [x for x in object_markings if x not in marking]
@ -124,7 +124,7 @@ def is_marked(obj, marking=None):
"""
marking = utils.convert_to_marking_list(marking)
object_markings = obj.get("object_marking_refs", [])
object_markings = obj.get('object_marking_refs', [])
if marking:
return any(x in object_markings for x in marking)

View File

@ -23,7 +23,7 @@ def _evaluate_expression(obj, selector):
"""
for items, value in iterpath(obj):
path = ".".join(items)
path = '.'.join(items)
if path == selector and value:
return [value]
@ -119,12 +119,12 @@ def compress_markings(granular_markings):
map_ = collections.defaultdict(set)
for granular_marking in granular_markings:
if granular_marking.get("marking_ref"):
map_[granular_marking.get("marking_ref")].update(granular_marking.get("selectors"))
if granular_marking.get('marking_ref'):
map_[granular_marking.get('marking_ref')].update(granular_marking.get('selectors'))
compressed = \
[
{"marking_ref": marking_ref, "selectors": sorted(selectors)}
{'marking_ref': marking_ref, 'selectors': sorted(selectors)}
for marking_ref, selectors in six.iteritems(map_)
]
@ -173,12 +173,12 @@ def expand_markings(granular_markings):
expanded = []
for marking in granular_markings:
selectors = marking.get("selectors")
marking_ref = marking.get("marking_ref")
selectors = marking.get('selectors')
marking_ref = marking.get('marking_ref')
expanded.extend(
[
{"marking_ref": marking_ref, "selectors": [selector]}
{'marking_ref': marking_ref, 'selectors': [selector]}
for selector in selectors
]
)
@ -189,7 +189,7 @@ def expand_markings(granular_markings):
def build_granular_marking(granular_marking):
"""Return a dictionary with the required structure for a granular marking.
"""
return {"granular_markings": expand_markings(granular_marking)}
return {'granular_markings': expand_markings(granular_marking)}
def iterpath(obj, path=None):
@ -229,7 +229,7 @@ def iterpath(obj, path=None):
elif isinstance(varobj, list):
for item in varobj:
index = "[{0}]".format(varobj.index(item))
index = '[{0}]'.format(varobj.index(item))
path.append(index)
yield (path, item)

View File

@ -16,7 +16,7 @@ from .exceptions import (InvalidValueError, RevokeError,
NOW = object()
# STIX object properties that cannot be modified
STIX_UNMOD_PROPERTIES = ["created", "created_by_ref", "id", "type"]
STIX_UNMOD_PROPERTIES = ['created', 'created_by_ref', 'id', 'type']
TYPE_REGEX = r'^\-?[a-z0-9]+(-[a-z0-9]+)*\-?$'
@ -90,16 +90,16 @@ def format_datetime(dttm):
zoned = pytz.utc.localize(dttm)
else:
zoned = dttm.astimezone(pytz.utc)
ts = zoned.strftime("%Y-%m-%dT%H:%M:%S")
ms = zoned.strftime("%f")
precision = getattr(dttm, "precision", None)
ts = zoned.strftime('%Y-%m-%dT%H:%M:%S')
ms = zoned.strftime('%f')
precision = getattr(dttm, 'precision', None)
if precision == 'second':
pass # Already precise to the second
elif precision == "millisecond":
elif precision == 'millisecond':
ts = ts + '.' + ms[:3]
elif zoned.microsecond > 0:
ts = ts + '.' + ms.rstrip("0")
return ts + "Z"
ts = ts + '.' + ms.rstrip('0')
return ts + 'Z'
def parse_into_datetime(value, precision=None):
@ -250,11 +250,11 @@ def new_version(data, **kwargs):
"""
if not isinstance(data, Mapping):
raise ValueError('cannot create new version of object of this type! '
'Try a dictionary or instance of an SDO or SRO class.')
raise ValueError("cannot create new version of object of this type! "
"Try a dictionary or instance of an SDO or SRO class.")
unchangable_properties = []
if data.get("revoked"):
if data.get('revoked'):
raise RevokeError("new_version")
try:
new_obj_inner = copy.deepcopy(data._inner)
@ -292,10 +292,10 @@ def revoke(data):
A new version of the object with ``revoked`` set to ``True``.
"""
if not isinstance(data, Mapping):
raise ValueError('cannot revoke object of this type! Try a dictionary '
'or instance of an SDO or SRO class.')
raise ValueError("cannot revoke object of this type! Try a dictionary "
"or instance of an SDO or SRO class.")
if data.get("revoked"):
if data.get('revoked'):
raise RevokeError("revoke")
return new_version(data, revoked=True, allow_custom=True)
@ -328,13 +328,13 @@ def remove_custom_stix(stix_obj):
A new version of the object with any custom content removed
"""
if stix_obj["type"].startswith("x-"):
if stix_obj['type'].startswith('x-'):
# if entire object is custom, discard
return None
custom_props = []
for prop in stix_obj.items():
if prop[0].startswith("x_"):
if prop[0].startswith('x_'):
# for every custom property, record it and set value to None
# (so we can pass it to new_version() and it will be dropped)
custom_props.append((prop[0], None))
@ -351,7 +351,7 @@ def remove_custom_stix(stix_obj):
# existing STIX object) and the "modified" property. We dont supply the
# "modified" property so that new_version() creates a new datetime
# value for this property
non_supplied_props = STIX_UNMOD_PROPERTIES + ["modified"]
non_supplied_props = STIX_UNMOD_PROPERTIES + ['modified']
props = [(prop, stix_obj[prop]) for prop in stix_obj if prop not in non_supplied_props]
@ -360,7 +360,7 @@ def remove_custom_stix(stix_obj):
new_obj = new_version(stix_obj, **(dict(props)))
while parse_into_datetime(new_obj["modified"]) == parse_into_datetime(stix_obj["modified"]):
while parse_into_datetime(new_obj['modified']) == parse_into_datetime(stix_obj['modified']):
# Prevents bug when fast computation allows multiple STIX object
# versions to be created in single unit of time
new_obj = new_version(stix_obj, **(dict(props)))

View File

@ -24,7 +24,7 @@ class STIXObjectProperty(Property):
# validation here depend on the value of another property
# (spec_version). So this is a hack, and not technically spec-
# compliant.
if "spec_version" in value:
if 'spec_version' in value:
raise ValueError("Spec version 2.0 bundles don't yet support "
"containing objects of a different spec "
"version.")
@ -37,7 +37,7 @@ class STIXObjectProperty(Property):
raise ValueError("This property may only contain a non-empty dictionary or object")
if 'type' in dictified and dictified['type'] == 'bundle':
raise ValueError('This property may not contain a Bundle object')
if "spec_version" in dictified:
if 'spec_version' in dictified:
# See above comment regarding spec_version.
raise ValueError("Spec version 2.0 bundles don't yet support "
"containing objects of a different spec version.")
@ -53,13 +53,12 @@ class Bundle(_STIXBase):
"""
_type = 'bundle'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
# Not technically correct: STIX 2.0 spec doesn't say spec_version must
# have this value, but it's all we support for now.
('spec_version', StringProperty(fixed="2.0")),
('spec_version', StringProperty(fixed='2.0')),
('objects', ListProperty(STIXObjectProperty)),
])

View File

@ -16,8 +16,7 @@ class ExternalReference(_STIXBase):
`the STIX 2.0 specification <http://docs.oasis-open.org/cti/stix/v2.0/cs01/part1-stix-core/stix-v2.0-cs01-part1-stix-core.html#_Toc496709261>`__.
"""
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('source_name', StringProperty(required=True)),
('description', StringProperty()),
('url', StringProperty()),
@ -27,7 +26,7 @@ class ExternalReference(_STIXBase):
def _check_object_constraints(self):
super(ExternalReference, self)._check_object_constraints()
self._check_at_least_one_property(["description", "external_id", "url"])
self._check_at_least_one_property(['description', 'external_id', 'url'])
class KillChainPhase(_STIXBase):
@ -36,8 +35,7 @@ class KillChainPhase(_STIXBase):
`the STIX 2.0 specification <http://docs.oasis-open.org/cti/stix/v2.0/cs01/part1-stix-core/stix-v2.0-cs01-part1-stix-core.html#_Toc496709267>`__.
"""
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('kill_chain_name', StringProperty(required=True)),
('phase_name', StringProperty(required=True)),
])
@ -49,9 +47,8 @@ class GranularMarking(_STIXBase):
`the STIX 2.0 specification <http://docs.oasis-open.org/cti/stix/v2.0/cs01/part1-stix-core/stix-v2.0-cs01-part1-stix-core.html#_Toc496709290>`__.
"""
_properties = OrderedDict()
_properties.update([
('marking_ref', ReferenceProperty(required=True, type="marking-definition")),
_properties = OrderedDict([
('marking_ref', ReferenceProperty(required=True, type='marking-definition')),
('selectors', ListProperty(SelectorProperty, required=True)),
])
@ -64,8 +61,7 @@ class TLPMarking(_STIXBase):
# TODO: don't allow the creation of any other TLPMarkings than the ones below
_type = 'tlp'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('tlp', StringProperty(required=True))
])
@ -77,8 +73,7 @@ class StatementMarking(_STIXBase):
"""
_type = 'statement'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('statement', StringProperty(required=True))
])
@ -109,14 +104,13 @@ class MarkingDefinition(_STIXBase, _MarkingsMixin):
"""
_type = 'marking-definition'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
('definition_type', StringProperty(required=True)),
('definition', MarkingProperty(required=True)),
@ -193,29 +187,29 @@ def CustomMarking(type='x-custom-marking', properties=None):
# TODO: don't allow the creation of any other TLPMarkings than the ones below
TLP_WHITE = MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="white")
id='marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9',
created='2017-01-20T00:00:00.000Z',
definition_type='tlp',
definition=TLPMarking(tlp='white')
)
TLP_GREEN = MarkingDefinition(
id="marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="green")
id='marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da',
created='2017-01-20T00:00:00.000Z',
definition_type='tlp',
definition=TLPMarking(tlp='green')
)
TLP_AMBER = MarkingDefinition(
id="marking-definition--f88d31f6-486f-44da-b317-01333bde0b82",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="amber")
id='marking-definition--f88d31f6-486f-44da-b317-01333bde0b82',
created='2017-01-20T00:00:00.000Z',
definition_type='tlp',
definition=TLPMarking(tlp='amber')
)
TLP_RED = MarkingDefinition(
id="marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="red")
id='marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed',
created='2017-01-20T00:00:00.000Z',
definition_type='tlp',
definition=TLPMarking(tlp='red')
)

View File

@ -102,8 +102,7 @@ class Artifact(_Observable):
""" # noqa
_type = 'artifact'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('mime_type', StringProperty()),
('payload_bin', BinaryProperty()),
@ -114,8 +113,8 @@ class Artifact(_Observable):
def _check_object_constraints(self):
super(Artifact, self)._check_object_constraints()
self._check_mutually_exclusive_properties(["payload_bin", "url"])
self._check_properties_dependency(["hashes"], ["url"])
self._check_mutually_exclusive_properties(['payload_bin', 'url'])
self._check_properties_dependency(['hashes'], ['url'])
class AutonomousSystem(_Observable):
@ -124,8 +123,7 @@ class AutonomousSystem(_Observable):
""" # noqa
_type = 'autonomous-system'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('number', IntegerProperty(required=True)),
('name', StringProperty()),
@ -140,8 +138,7 @@ class Directory(_Observable):
""" # noqa
_type = 'directory'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('path', StringProperty(required=True)),
('path_enc', StringProperty()),
@ -160,8 +157,7 @@ class DomainName(_Observable):
""" # noqa
_type = 'domain-name'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))),
@ -175,8 +171,7 @@ class EmailAddress(_Observable):
""" # noqa
_type = 'email-addr'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('display_name', StringProperty()),
@ -190,8 +185,7 @@ class EmailMIMEComponent(_STIXBase):
`the STIX 2.0 specification <http://docs.oasis-open.org/cti/stix/v2.0/cs01/part4-cyber-observable-objects/stix-v2.0-cs01-part4-cyber-observable-objects.html#_Toc496716231>`__.
""" # noqa
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('body', StringProperty()),
('body_raw_ref', ObjectReferenceProperty(valid_types=['artifact', 'file'])),
('content_type', StringProperty()),
@ -200,7 +194,7 @@ class EmailMIMEComponent(_STIXBase):
def _check_object_constraints(self):
super(EmailMIMEComponent, self)._check_object_constraints()
self._check_at_least_one_property(["body", "body_raw_ref"])
self._check_at_least_one_property(['body', 'body_raw_ref'])
class EmailMessage(_Observable):
@ -209,8 +203,7 @@ class EmailMessage(_Observable):
""" # noqa
_type = 'email-message'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('is_multipart', BooleanProperty(required=True)),
('date', TimestampProperty()),
@ -231,10 +224,10 @@ class EmailMessage(_Observable):
def _check_object_constraints(self):
super(EmailMessage, self)._check_object_constraints()
self._check_properties_dependency(["is_multipart"], ["body_multipart"])
if self.get("is_multipart") is True and self.get("body"):
self._check_properties_dependency(['is_multipart'], ['body_multipart'])
if self.get('is_multipart') is True and self.get('body'):
# 'body' MAY only be used if is_multipart is false.
raise DependentPropertiesError(self.__class__, [("is_multipart", "body")])
raise DependentPropertiesError(self.__class__, [('is_multipart', 'body')])
class ArchiveExt(_Extension):
@ -243,8 +236,7 @@ class ArchiveExt(_Extension):
""" # noqa
_type = 'archive-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('contains_refs', ListProperty(ObjectReferenceProperty(valid_types='file'), required=True)),
('version', StringProperty()),
('comment', StringProperty()),
@ -256,8 +248,7 @@ class AlternateDataStream(_STIXBase):
`the STIX 2.0 specification <http://docs.oasis-open.org/cti/stix/v2.0/cs01/part4-cyber-observable-objects/stix-v2.0-cs01-part4-cyber-observable-objects.html#_Toc496716239>`__.
""" # noqa
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('name', StringProperty(required=True)),
('hashes', HashesProperty()),
('size', IntegerProperty()),
@ -270,8 +261,7 @@ class NTFSExt(_Extension):
""" # noqa
_type = 'ntfs-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('sid', StringProperty()),
('alternate_data_streams', ListProperty(EmbeddedObjectProperty(type=AlternateDataStream))),
])
@ -283,8 +273,7 @@ class PDFExt(_Extension):
""" # noqa
_type = 'pdf-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('version', StringProperty()),
('is_optimized', BooleanProperty()),
('document_info_dict', DictionaryProperty()),
@ -299,8 +288,7 @@ class RasterImageExt(_Extension):
""" # noqa
_type = 'raster-image-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('image_height', IntegerProperty()),
('image_width', IntegerProperty()),
('bits_per_pixel', IntegerProperty()),
@ -314,8 +302,7 @@ class WindowsPEOptionalHeaderType(_STIXBase):
`the STIX 2.0 specification <http://docs.oasis-open.org/cti/stix/v2.0/cs01/part4-cyber-observable-objects/stix-v2.0-cs01-part4-cyber-observable-objects.html#_Toc496716248>`__.
""" # noqa
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('magic_hex', HexProperty()),
('major_linker_version', IntegerProperty()),
('minor_linker_version', IntegerProperty()),
@ -359,8 +346,7 @@ class WindowsPESection(_STIXBase):
`the STIX 2.0 specification <http://docs.oasis-open.org/cti/stix/v2.0/cs01/part4-cyber-observable-objects/stix-v2.0-cs01-part4-cyber-observable-objects.html#_Toc496716250>`__.
""" # noqa
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('name', StringProperty(required=True)),
('size', IntegerProperty()),
('entropy', FloatProperty()),
@ -374,8 +360,7 @@ class WindowsPEBinaryExt(_Extension):
""" # noqa
_type = 'windows-pebinary-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('pe_type', StringProperty(required=True)), # open_vocab
('imphash', StringProperty()),
('machine_hex', HexProperty()),
@ -397,8 +382,7 @@ class File(_Observable):
""" # noqa
_type = 'file'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('hashes', HashesProperty()),
('size', IntegerProperty()),
@ -421,8 +405,8 @@ class File(_Observable):
def _check_object_constraints(self):
super(File, self)._check_object_constraints()
self._check_properties_dependency(["is_encrypted"], ["encryption_algorithm", "decryption_key"])
self._check_at_least_one_property(["hashes", "name"])
self._check_properties_dependency(['is_encrypted'], ['encryption_algorithm', 'decryption_key'])
self._check_at_least_one_property(['hashes', 'name'])
class IPv4Address(_Observable):
@ -431,8 +415,7 @@ class IPv4Address(_Observable):
""" # noqa
_type = 'ipv4-addr'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
@ -447,8 +430,7 @@ class IPv6Address(_Observable):
""" # noqa
_type = 'ipv6-addr'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
@ -463,8 +445,7 @@ class MACAddress(_Observable):
""" # noqa
_type = 'mac-addr'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
@ -477,8 +458,7 @@ class Mutex(_Observable):
""" # noqa
_type = 'mutex'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('name', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
@ -491,8 +471,7 @@ class HTTPRequestExt(_Extension):
""" # noqa
_type = 'http-request-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('request_method', StringProperty(required=True)),
('request_value', StringProperty(required=True)),
('request_version', StringProperty()),
@ -508,8 +487,7 @@ class ICMPExt(_Extension):
""" # noqa
_type = 'icmp-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('icmp_type_hex', HexProperty(required=True)),
('icmp_code_hex', HexProperty(required=True)),
])
@ -521,8 +499,7 @@ class SocketExt(_Extension):
""" # noqa
_type = 'socket-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('address_family', EnumProperty(allowed=[
"AF_UNSPEC",
"AF_INET",
@ -562,8 +539,7 @@ class TCPExt(_Extension):
""" # noqa
_type = 'tcp-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('src_flags_hex', HexProperty()),
('dst_flags_hex', HexProperty()),
])
@ -575,8 +551,7 @@ class NetworkTraffic(_Observable):
""" # noqa
_type = 'network-traffic'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('start', TimestampProperty()),
('end', TimestampProperty()),
@ -600,7 +575,7 @@ class NetworkTraffic(_Observable):
def _check_object_constraints(self):
super(NetworkTraffic, self)._check_object_constraints()
self._check_at_least_one_property(["src_ref", "dst_ref"])
self._check_at_least_one_property(['src_ref', 'dst_ref'])
class WindowsProcessExt(_Extension):
@ -609,8 +584,7 @@ class WindowsProcessExt(_Extension):
""" # noqa
_type = 'windows-process-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('aslr_enabled', BooleanProperty()),
('dep_enabled', BooleanProperty()),
('priority', StringProperty()),
@ -626,8 +600,7 @@ class WindowsServiceExt(_Extension):
""" # noqa
_type = 'windows-service-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('service_name', StringProperty(required=True)),
('descriptions', ListProperty(StringProperty)),
('display_name', StringProperty()),
@ -664,8 +637,7 @@ class Process(_Observable):
""" # noqa
_type = 'process'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('is_hidden', BooleanProperty()),
('pid', IntegerProperty()),
@ -689,14 +661,14 @@ class Process(_Observable):
super(Process, self)._check_object_constraints()
try:
self._check_at_least_one_property()
if "windows-process-ext" in self.get('extensions', {}):
self.extensions["windows-process-ext"]._check_at_least_one_property()
if 'windows-process-ext' in self.get('extensions', {}):
self.extensions['windows-process-ext']._check_at_least_one_property()
except AtLeastOnePropertyError as enclosing_exc:
if 'extensions' not in self:
raise enclosing_exc
else:
if "windows-process-ext" in self.get('extensions', {}):
self.extensions["windows-process-ext"]._check_at_least_one_property()
if 'windows-process-ext' in self.get('extensions', {}):
self.extensions['windows-process-ext']._check_at_least_one_property()
class Software(_Observable):
@ -705,8 +677,7 @@ class Software(_Observable):
""" # noqa
_type = 'software'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('name', StringProperty(required=True)),
('cpe', StringProperty()),
@ -723,8 +694,7 @@ class URL(_Observable):
""" # noqa
_type = 'url'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
@ -737,8 +707,7 @@ class UNIXAccountExt(_Extension):
""" # noqa
_type = 'unix-account-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('gid', IntegerProperty()),
('groups', ListProperty(StringProperty)),
('home_dir', StringProperty()),
@ -752,8 +721,7 @@ class UserAccount(_Observable):
""" # noqa
_type = 'user-account'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('user_id', StringProperty(required=True)),
('account_login', StringProperty()),
@ -778,24 +746,23 @@ class WindowsRegistryValueType(_STIXBase):
""" # noqa
_type = 'windows-registry-value-type'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('name', StringProperty(required=True)),
('data', StringProperty()),
('data_type', EnumProperty(allowed=[
'REG_NONE',
'REG_SZ',
'REG_EXPAND_SZ',
'REG_BINARY',
'REG_DWORD',
'REG_DWORD_BIG_ENDIAN',
'REG_LINK',
'REG_MULTI_SZ',
'REG_RESOURCE_LIST',
'REG_FULL_RESOURCE_DESCRIPTION',
'REG_RESOURCE_REQUIREMENTS_LIST',
'REG_QWORD',
'REG_INVALID_TYPE',
"REG_NONE",
"REG_SZ",
"REG_EXPAND_SZ",
"REG_BINARY",
"REG_DWORD",
"REG_DWORD_BIG_ENDIAN",
"REG_LINK",
"REG_MULTI_SZ",
"REG_RESOURCE_LIST",
"REG_FULL_RESOURCE_DESCRIPTION",
"REG_RESOURCE_REQUIREMENTS_LIST",
"REG_QWORD",
"REG_INVALID_TYPE",
])),
])
@ -806,8 +773,7 @@ class WindowsRegistryKey(_Observable):
""" # noqa
_type = 'windows-registry-key'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('key', StringProperty(required=True)),
('values', ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType))),
@ -830,8 +796,7 @@ class X509V3ExtenstionsType(_STIXBase):
""" # noqa
_type = 'x509-v3-extensions-type'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('basic_constraints', StringProperty()),
('name_constraints', StringProperty()),
('policy_constraints', StringProperty()),
@ -857,8 +822,7 @@ class X509Certificate(_Observable):
""" # noqa
_type = 'x509-certificate'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('is_self_signed', BooleanProperty()),
('hashes', HashesProperty()),

View File

@ -25,11 +25,10 @@ class AttackPattern(STIXDomainObject):
"""
_type = 'attack-pattern'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -38,7 +37,7 @@ class AttackPattern(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -49,11 +48,10 @@ class Campaign(STIXDomainObject):
"""
_type = 'campaign'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -65,7 +63,7 @@ class Campaign(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -76,11 +74,10 @@ class CourseOfAction(STIXDomainObject):
"""
_type = 'course-of-action'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -88,7 +85,7 @@ class CourseOfAction(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -99,11 +96,10 @@ class Identity(STIXDomainObject):
"""
_type = 'identity'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -114,7 +110,7 @@ class Identity(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -125,11 +121,10 @@ class Indicator(STIXDomainObject):
"""
_type = 'indicator'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty()),
@ -141,7 +136,7 @@ class Indicator(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty, required=True)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -152,11 +147,10 @@ class IntrusionSet(STIXDomainObject):
"""
_type = 'intrusion-set'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -171,7 +165,7 @@ class IntrusionSet(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -182,11 +176,10 @@ class Malware(STIXDomainObject):
"""
_type = 'malware'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -195,7 +188,7 @@ class Malware(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty, required=True)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -206,11 +199,10 @@ class ObservedData(STIXDomainObject):
"""
_type = 'observed-data'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('first_observed', TimestampProperty(required=True)),
@ -220,7 +212,7 @@ class ObservedData(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -237,11 +229,10 @@ class Report(STIXDomainObject):
"""
_type = 'report'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -251,7 +242,7 @@ class Report(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty, required=True)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -262,11 +253,10 @@ class ThreatActor(STIXDomainObject):
"""
_type = 'threat-actor'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -282,7 +272,7 @@ class ThreatActor(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty, required=True)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -293,11 +283,10 @@ class Tool(STIXDomainObject):
"""
_type = 'tool'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -307,7 +296,7 @@ class Tool(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty, required=True)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -318,11 +307,10 @@ class Vulnerability(STIXDomainObject):
"""
_type = 'vulnerability'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -330,7 +318,7 @@ class Vulnerability(STIXDomainObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -371,11 +359,10 @@ def CustomObject(type='x-custom-type', properties=None):
raise ValueError("Invalid type name '%s': must be between 3 and 250 characters." % type)
_type = type
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
])
@ -383,19 +370,19 @@ def CustomObject(type='x-custom-type', properties=None):
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties.update([x for x in properties if not x[0].startswith("x_")])
_properties.update([x for x in properties if not x[0].startswith('x_')])
# This is to follow the general properties structure.
_properties.update([
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
# Put all custom properties at the bottom, sorted alphabetically.
_properties.update(sorted([x for x in properties if x[0].startswith("x_")], key=lambda x: x[0]))
_properties.update(sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]))
def __init__(self, **kwargs):
_STIXBase.__init__(self, **kwargs)
@ -408,7 +395,7 @@ def CustomObject(type='x-custom-type', properties=None):
return
raise e
_register_type(_Custom, version="2.0")
_register_type(_Custom, version='2.0')
return _Custom
return custom_builder

View File

@ -21,11 +21,10 @@ class Relationship(STIXRelationshipObject):
"""
_type = 'relationship'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('relationship_type', StringProperty(required=True)),
@ -35,7 +34,7 @@ class Relationship(STIXRelationshipObject):
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -59,24 +58,23 @@ class Sighting(STIXRelationshipObject):
"""
_type = 'sighting'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
('count', IntegerProperty()),
('sighting_of_ref', ReferenceProperty(required=True)),
('observed_data_refs', ListProperty(ReferenceProperty(type="observed-data"))),
('where_sighted_refs', ListProperty(ReferenceProperty(type="identity"))),
('observed_data_refs', ListProperty(ReferenceProperty(type='observed-data'))),
('where_sighted_refs', ListProperty(ReferenceProperty(type='identity'))),
('summary', BooleanProperty(default=lambda: False)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])

View File

@ -39,8 +39,7 @@ class Bundle(_STIXBase):
"""
_type = 'bundle'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('objects', ListProperty(STIXObjectProperty)),

View File

@ -13,8 +13,7 @@ from .properties import (BooleanProperty, DictionaryProperty, HashesProperty,
class ExternalReference(_STIXBase):
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('source_name', StringProperty(required=True)),
('description', StringProperty()),
('url', StringProperty()),
@ -24,13 +23,12 @@ class ExternalReference(_STIXBase):
def _check_object_constraints(self):
super(ExternalReference, self)._check_object_constraints()
self._check_at_least_one_property(["description", "external_id", "url"])
self._check_at_least_one_property(['description', 'external_id', 'url'])
class KillChainPhase(_STIXBase):
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('kill_chain_name', StringProperty(required=True)),
('phase_name', StringProperty(required=True)),
])
@ -38,26 +36,24 @@ class KillChainPhase(_STIXBase):
class GranularMarking(_STIXBase):
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('lang', StringProperty()),
('marking_ref', ReferenceProperty(type="marking-definition")),
('marking_ref', ReferenceProperty(type='marking-definition')),
('selectors', ListProperty(SelectorProperty, required=True)),
])
def _check_object_constraints(self):
super(GranularMarking, self)._check_object_constraints()
self._check_at_least_one_property(["lang", "marking_ref"])
self._check_at_least_one_property(['lang', 'marking_ref'])
class LanguageContent(_STIXBase):
_type = 'language-content'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('object_ref', ReferenceProperty(required=True)),
@ -68,7 +64,7 @@ class LanguageContent(_STIXBase):
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -76,8 +72,7 @@ class LanguageContent(_STIXBase):
class TLPMarking(_STIXBase):
_type = 'tlp'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('tlp', Property(required=True))
])
@ -85,8 +80,7 @@ class TLPMarking(_STIXBase):
class StatementMarking(_STIXBase):
_type = 'statement'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('statement', StringProperty(required=True))
])
@ -113,14 +107,13 @@ class MarkingProperty(Property):
class MarkingDefinition(_STIXBase, _MarkingsMixin):
_type = 'marking-definition'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
('definition_type', StringProperty(required=True)),
('definition', MarkingProperty(required=True)),
@ -170,12 +163,11 @@ def CustomMarking(type='x-custom-marking', properties=None):
class _Custom(cls, _STIXBase):
_type = type
_properties = OrderedDict()
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties.update(properties)
_properties = OrderedDict(properties)
def __init__(self, **kwargs):
_STIXBase.__init__(self, **kwargs)
@ -197,29 +189,29 @@ def CustomMarking(type='x-custom-marking', properties=None):
# TODO: don't allow the creation of any other TLPMarkings than the ones below
TLP_WHITE = MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="white")
id='marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9',
created='2017-01-20T00:00:00.000Z',
definition_type='tlp',
definition=TLPMarking(tlp='white')
)
TLP_GREEN = MarkingDefinition(
id="marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="green")
id='marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da',
created='2017-01-20T00:00:00.000Z',
definition_type='tlp',
definition=TLPMarking(tlp='green')
)
TLP_AMBER = MarkingDefinition(
id="marking-definition--f88d31f6-486f-44da-b317-01333bde0b82",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="amber")
id='marking-definition--f88d31f6-486f-44da-b317-01333bde0b82',
created='2017-01-20T00:00:00.000Z',
definition_type='tlp',
definition=TLPMarking(tlp='amber')
)
TLP_RED = MarkingDefinition(
id="marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="red")
id='marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed',
created='2017-01-20T00:00:00.000Z',
definition_type='tlp',
definition=TLPMarking(tlp='red')
)

View File

@ -103,8 +103,7 @@ class Artifact(_Observable):
"""
_type = 'artifact'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('mime_type', StringProperty()),
('payload_bin', BinaryProperty()),
@ -115,8 +114,8 @@ class Artifact(_Observable):
def _check_object_constraints(self):
super(Artifact, self)._check_object_constraints()
self._check_mutually_exclusive_properties(["payload_bin", "url"])
self._check_properties_dependency(["hashes"], ["url"])
self._check_mutually_exclusive_properties(['payload_bin', 'url'])
self._check_properties_dependency(['hashes'], ['url'])
class AutonomousSystem(_Observable):
@ -126,8 +125,7 @@ class AutonomousSystem(_Observable):
"""
_type = 'autonomous-system'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('number', IntegerProperty(required=True)),
('name', StringProperty()),
@ -143,8 +141,7 @@ class Directory(_Observable):
"""
_type = 'directory'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('path', StringProperty(required=True)),
('path_enc', StringProperty()),
@ -164,8 +161,7 @@ class DomainName(_Observable):
"""
_type = 'domain-name'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))),
@ -180,8 +176,7 @@ class EmailAddress(_Observable):
"""
_type = 'email-addr'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('display_name', StringProperty()),
@ -196,8 +191,7 @@ class EmailMIMEComponent(_STIXBase):
`the STIX 2.1 specification <link here>`__.
"""
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('body', StringProperty()),
('body_raw_ref', ObjectReferenceProperty(valid_types=['artifact', 'file'])),
('content_type', StringProperty()),
@ -206,7 +200,7 @@ class EmailMIMEComponent(_STIXBase):
def _check_object_constraints(self):
super(EmailMIMEComponent, self)._check_object_constraints()
self._check_at_least_one_property(["body", "body_raw_ref"])
self._check_at_least_one_property(['body', 'body_raw_ref'])
class EmailMessage(_Observable):
@ -216,8 +210,7 @@ class EmailMessage(_Observable):
"""
_type = 'email-message'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('is_multipart', BooleanProperty(required=True)),
('date', TimestampProperty()),
@ -238,10 +231,10 @@ class EmailMessage(_Observable):
def _check_object_constraints(self):
super(EmailMessage, self)._check_object_constraints()
self._check_properties_dependency(["is_multipart"], ["body_multipart"])
if self.get("is_multipart") is True and self.get("body"):
self._check_properties_dependency(['is_multipart'], ['body_multipart'])
if self.get('is_multipart') is True and self.get('body'):
# 'body' MAY only be used if is_multipart is false.
raise DependentPropertiesError(self.__class__, [("is_multipart", "body")])
raise DependentPropertiesError(self.__class__, [('is_multipart', 'body')])
class ArchiveExt(_Extension):
@ -251,8 +244,7 @@ class ArchiveExt(_Extension):
"""
_type = 'archive-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('contains_refs', ListProperty(ObjectReferenceProperty(valid_types='file'), required=True)),
('version', StringProperty()),
('comment', StringProperty()),
@ -265,8 +257,7 @@ class AlternateDataStream(_STIXBase):
`the STIX 2.1 specification <link here>`__.
"""
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('name', StringProperty(required=True)),
('hashes', HashesProperty()),
('size', IntegerProperty()),
@ -280,8 +271,7 @@ class NTFSExt(_Extension):
"""
_type = 'ntfs-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('sid', StringProperty()),
('alternate_data_streams', ListProperty(EmbeddedObjectProperty(type=AlternateDataStream))),
])
@ -294,8 +284,7 @@ class PDFExt(_Extension):
"""
_type = 'pdf-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('version', StringProperty()),
('is_optimized', BooleanProperty()),
('document_info_dict', DictionaryProperty()),
@ -311,8 +300,7 @@ class RasterImageExt(_Extension):
"""
_type = 'raster-image-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('image_height', IntegerProperty()),
('image_width', IntegerProperty()),
('bits_per_pixel', IntegerProperty()),
@ -327,8 +315,7 @@ class WindowsPEOptionalHeaderType(_STIXBase):
`the STIX 2.1 specification <link here>`__.
"""
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('magic_hex', HexProperty()),
('major_linker_version', IntegerProperty()),
('minor_linker_version', IntegerProperty()),
@ -373,8 +360,7 @@ class WindowsPESection(_STIXBase):
`the STIX 2.1 specification <link here>`__.
"""
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('name', StringProperty(required=True)),
('size', IntegerProperty()),
('entropy', FloatProperty()),
@ -389,8 +375,7 @@ class WindowsPEBinaryExt(_Extension):
"""
_type = 'windows-pebinary-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('pe_type', StringProperty(required=True)), # open_vocab
('imphash', StringProperty()),
('machine_hex', HexProperty()),
@ -413,8 +398,7 @@ class File(_Observable):
"""
_type = 'file'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('hashes', HashesProperty()),
('size', IntegerProperty()),
@ -437,8 +421,8 @@ class File(_Observable):
def _check_object_constraints(self):
super(File, self)._check_object_constraints()
self._check_properties_dependency(["is_encrypted"], ["encryption_algorithm", "decryption_key"])
self._check_at_least_one_property(["hashes", "name"])
self._check_properties_dependency(['is_encrypted'], ['encryption_algorithm', 'decryption_key'])
self._check_at_least_one_property(['hashes', 'name'])
class IPv4Address(_Observable):
@ -448,8 +432,7 @@ class IPv4Address(_Observable):
"""
_type = 'ipv4-addr'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
@ -465,8 +448,7 @@ class IPv6Address(_Observable):
"""
_type = 'ipv6-addr'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
@ -482,8 +464,7 @@ class MACAddress(_Observable):
"""
_type = 'mac-addr'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
@ -497,8 +478,7 @@ class Mutex(_Observable):
"""
_type = 'mutex'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('name', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
@ -512,8 +492,7 @@ class HTTPRequestExt(_Extension):
"""
_type = 'http-request-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('request_method', StringProperty(required=True)),
('request_value', StringProperty(required=True)),
('request_version', StringProperty()),
@ -530,8 +509,7 @@ class ICMPExt(_Extension):
"""
_type = 'icmp-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('icmp_type_hex', HexProperty(required=True)),
('icmp_code_hex', HexProperty(required=True)),
])
@ -544,8 +522,7 @@ class SocketExt(_Extension):
"""
_type = 'socket-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('address_family', EnumProperty(allowed=[
"AF_UNSPEC",
"AF_INET",
@ -586,8 +563,7 @@ class TCPExt(_Extension):
"""
_type = 'tcp-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('src_flags_hex', HexProperty()),
('dst_flags_hex', HexProperty()),
])
@ -600,8 +576,7 @@ class NetworkTraffic(_Observable):
"""
_type = 'network-traffic'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('start', TimestampProperty()),
('end', TimestampProperty()),
@ -625,7 +600,7 @@ class NetworkTraffic(_Observable):
def _check_object_constraints(self):
super(NetworkTraffic, self)._check_object_constraints()
self._check_at_least_one_property(["src_ref", "dst_ref"])
self._check_at_least_one_property(['src_ref', 'dst_ref'])
class WindowsProcessExt(_Extension):
@ -635,8 +610,7 @@ class WindowsProcessExt(_Extension):
"""
_type = 'windows-process-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('aslr_enabled', BooleanProperty()),
('dep_enabled', BooleanProperty()),
('priority', StringProperty()),
@ -653,8 +627,7 @@ class WindowsServiceExt(_Extension):
"""
_type = 'windows-service-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('service_name', StringProperty(required=True)),
('descriptions', ListProperty(StringProperty)),
('display_name', StringProperty()),
@ -692,8 +665,7 @@ class Process(_Observable):
"""
_type = 'process'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('is_hidden', BooleanProperty()),
('pid', IntegerProperty()),
@ -717,14 +689,14 @@ class Process(_Observable):
super(Process, self)._check_object_constraints()
try:
self._check_at_least_one_property()
if "windows-process-ext" in self.get('extensions', {}):
self.extensions["windows-process-ext"]._check_at_least_one_property()
if 'windows-process-ext' in self.get('extensions', {}):
self.extensions['windows-process-ext']._check_at_least_one_property()
except AtLeastOnePropertyError as enclosing_exc:
if 'extensions' not in self:
raise enclosing_exc
else:
if "windows-process-ext" in self.get('extensions', {}):
self.extensions["windows-process-ext"]._check_at_least_one_property()
if 'windows-process-ext' in self.get('extensions', {}):
self.extensions['windows-process-ext']._check_at_least_one_property()
class Software(_Observable):
@ -734,8 +706,7 @@ class Software(_Observable):
"""
_type = 'software'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('name', StringProperty(required=True)),
('cpe', StringProperty()),
@ -753,8 +724,7 @@ class URL(_Observable):
"""
_type = 'url'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
@ -768,8 +738,7 @@ class UNIXAccountExt(_Extension):
"""
_type = 'unix-account-ext'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('gid', IntegerProperty()),
('groups', ListProperty(StringProperty)),
('home_dir', StringProperty()),
@ -784,8 +753,7 @@ class UserAccount(_Observable):
"""
_type = 'user-account'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('user_id', StringProperty(required=True)),
('account_login', StringProperty()),
@ -811,24 +779,23 @@ class WindowsRegistryValueType(_STIXBase):
"""
_type = 'windows-registry-value-type'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('name', StringProperty(required=True)),
('data', StringProperty()),
('data_type', EnumProperty(allowed=[
'REG_NONE',
'REG_SZ',
'REG_EXPAND_SZ',
'REG_BINARY',
'REG_DWORD',
'REG_DWORD_BIG_ENDIAN',
'REG_LINK',
'REG_MULTI_SZ',
'REG_RESOURCE_LIST',
'REG_FULL_RESOURCE_DESCRIPTION',
'REG_RESOURCE_REQUIREMENTS_LIST',
'REG_QWORD',
'REG_INVALID_TYPE',
"REG_NONE",
"REG_SZ",
"REG_EXPAND_SZ",
"REG_BINARY",
"REG_DWORD",
"REG_DWORD_BIG_ENDIAN",
"REG_LINK",
"REG_MULTI_SZ",
"REG_RESOURCE_LIST",
"REG_FULL_RESOURCE_DESCRIPTION",
"REG_RESOURCE_REQUIREMENTS_LIST",
"REG_QWORD",
"REG_INVALID_TYPE",
])),
])
@ -840,8 +807,7 @@ class WindowsRegistryKey(_Observable):
"""
_type = 'windows-registry-key'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('key', StringProperty(required=True)),
('values', ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType))),
@ -865,8 +831,7 @@ class X509V3ExtenstionsType(_STIXBase):
"""
_type = 'x509-v3-extensions-type'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('basic_constraints', StringProperty()),
('name_constraints', StringProperty()),
('policy_constraints', StringProperty()),
@ -893,8 +858,7 @@ class X509Certificate(_Observable):
"""
_type = 'x509-certificate'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('is_self_signed', BooleanProperty()),
('hashes', HashesProperty()),
@ -1038,8 +1002,7 @@ def CustomObservable(type='x-custom-observable', properties=None):
raise ValueError("Invalid observable type name '%s': must be between 3 and 250 characters." % type)
_type = type
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
])
@ -1117,12 +1080,11 @@ def CustomExtension(observable=None, type='x-custom-observable', properties=None
raise ValueError("Invalid extension type name '%s': must be between 3 and 250 characters." % type)
_type = type
_properties = OrderedDict()
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties.update(properties)
_properties = OrderedDict(properties)
def __init__(self, **kwargs):
_Extension.__init__(self, **kwargs)

View File

@ -27,11 +27,10 @@ class AttackPattern(STIXDomainObject):
"""
_type = 'attack-pattern'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -42,7 +41,7 @@ class AttackPattern(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -54,11 +53,10 @@ class Campaign(STIXDomainObject):
"""
_type = 'campaign'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -72,7 +70,7 @@ class Campaign(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -84,11 +82,10 @@ class CourseOfAction(STIXDomainObject):
"""
_type = 'course-of-action'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -98,7 +95,7 @@ class CourseOfAction(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -110,11 +107,10 @@ class Identity(STIXDomainObject):
"""
_type = 'identity'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -127,7 +123,7 @@ class Identity(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -139,11 +135,10 @@ class Indicator(STIXDomainObject):
"""
_type = 'indicator'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty()),
@ -157,7 +152,7 @@ class Indicator(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -169,11 +164,10 @@ class IntrusionSet(STIXDomainObject):
"""
_type = 'intrusion-set'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -190,7 +184,7 @@ class IntrusionSet(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -202,12 +196,11 @@ class Location(STIXDomainObject):
"""
_type = 'location'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('description', StringProperty()),
@ -225,15 +218,14 @@ class Location(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
class AnalysisType(_STIXBase):
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('start_time', TimestampProperty()),
('end_time', TimestampProperty()),
('analysis_tools', ObservableProperty()),
@ -244,8 +236,7 @@ class AnalysisType(_STIXBase):
class AVResultsType(_STIXBase):
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('product', StringProperty()),
('engine_version', StringProperty()),
('definition_version', StringProperty()),
@ -263,12 +254,11 @@ class Malware(STIXDomainObject):
"""
_type = 'malware'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -279,7 +269,7 @@ class Malware(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
('is_family', BooleanProperty(required=True)),
('first_seen', TimestampProperty()),
@ -302,12 +292,11 @@ class Note(STIXDomainObject):
"""
_type = 'note'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('summary', StringProperty()),
@ -319,7 +308,7 @@ class Note(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -331,11 +320,10 @@ class ObservedData(STIXDomainObject):
"""
_type = 'observed-data'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('first_observed', TimestampProperty(required=True)),
@ -347,7 +335,7 @@ class ObservedData(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -365,12 +353,11 @@ class Opinion(STIXDomainObject):
"""
_type = 'opinion'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('description', StringProperty()),
@ -388,7 +375,7 @@ class Opinion(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -400,11 +387,10 @@ class Report(STIXDomainObject):
"""
_type = 'report'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -416,7 +402,7 @@ class Report(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -428,11 +414,10 @@ class ThreatActor(STIXDomainObject):
"""
_type = 'threat-actor'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -450,7 +435,7 @@ class ThreatActor(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -462,11 +447,10 @@ class Tool(STIXDomainObject):
"""
_type = 'tool'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -478,7 +462,7 @@ class Tool(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -490,11 +474,10 @@ class Vulnerability(STIXDomainObject):
"""
_type = 'vulnerability'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
@ -504,7 +487,7 @@ class Vulnerability(STIXDomainObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -545,11 +528,10 @@ def CustomObject(type='x-custom-type', properties=None):
raise ValueError("Invalid type name '%s': must be between 3 and 250 characters." % type)
_type = type
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
])
@ -557,7 +539,7 @@ def CustomObject(type='x-custom-type', properties=None):
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties.update([x for x in properties if not x[0].startswith("x_")])
_properties.update([x for x in properties if not x[0].startswith('x_')])
# This is to follow the general properties structure.
_properties.update([
@ -566,12 +548,12 @@ def CustomObject(type='x-custom-type', properties=None):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
# Put all custom properties at the bottom, sorted alphabetically.
_properties.update(sorted([x for x in properties if x[0].startswith("x_")], key=lambda x: x[0]))
_properties.update(sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]))
def __init__(self, **kwargs):
_STIXBase.__init__(self, **kwargs)
@ -584,7 +566,7 @@ def CustomObject(type='x-custom-type', properties=None):
return
raise e
_register_type(_Custom, version="2.1")
_register_type(_Custom, version='2.1')
return _Custom
return custom_builder

View File

@ -22,12 +22,11 @@ class Relationship(STIXRelationshipObject):
"""
_type = 'relationship'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('spec_version', StringProperty(fixed="2.1")),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('relationship_type', StringProperty(required=True)),
@ -39,7 +38,7 @@ class Relationship(STIXRelationshipObject):
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])
@ -64,26 +63,25 @@ class Sighting(STIXRelationshipObject):
"""
_type = 'sighting'
_properties = OrderedDict()
_properties.update([
_properties = OrderedDict([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
('count', IntegerProperty()),
('sighting_of_ref', ReferenceProperty(required=True)),
('observed_data_refs', ListProperty(ReferenceProperty(type="observed-data"))),
('where_sighted_refs', ListProperty(ReferenceProperty(type="identity"))),
('observed_data_refs', ListProperty(ReferenceProperty(type='observed-data'))),
('where_sighted_refs', ListProperty(ReferenceProperty(type='identity'))),
('summary', BooleanProperty()),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),
])

View File

@ -1 +1 @@
__version__ = "1.0.2"
__version__ = '1.0.2'