Merge pull request #100 from emmanvg/issue-98

Performance, Design and other fixes
stix2.0
Greg Back 2017-11-08 23:29:28 +00:00 committed by GitHub
commit ef6dade6f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 328 additions and 314 deletions

View File

@ -99,101 +99,28 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<style type=\"text/css\">.highlight .hll { background-color: #ffffcc }\n",
".highlight { background: #f8f8f8; }\n",
".highlight .c { color: #408080; font-style: italic } /* Comment */\n",
".highlight .err { border: 1px solid #FF0000 } /* Error */\n",
".highlight .k { color: #008000; font-weight: bold } /* Keyword */\n",
".highlight .o { color: #666666 } /* Operator */\n",
".highlight .ch { color: #408080; font-style: italic } /* Comment.Hashbang */\n",
".highlight .cm { color: #408080; font-style: italic } /* Comment.Multiline */\n",
".highlight .cp { color: #BC7A00 } /* Comment.Preproc */\n",
".highlight .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */\n",
".highlight .c1 { color: #408080; font-style: italic } /* Comment.Single */\n",
".highlight .cs { color: #408080; font-style: italic } /* Comment.Special */\n",
".highlight .gd { color: #A00000 } /* Generic.Deleted */\n",
".highlight .ge { font-style: italic } /* Generic.Emph */\n",
".highlight .gr { color: #FF0000 } /* Generic.Error */\n",
".highlight .gh { color: #000080; font-weight: bold } /* Generic.Heading */\n",
".highlight .gi { color: #00A000 } /* Generic.Inserted */\n",
".highlight .go { color: #888888 } /* Generic.Output */\n",
".highlight .gp { color: #000080; font-weight: bold } /* Generic.Prompt */\n",
".highlight .gs { font-weight: bold } /* Generic.Strong */\n",
".highlight .gu { color: #800080; font-weight: bold } /* Generic.Subheading */\n",
".highlight .gt { color: #0044DD } /* Generic.Traceback */\n",
".highlight .kc { color: #008000; font-weight: bold } /* Keyword.Constant */\n",
".highlight .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */\n",
".highlight .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */\n",
".highlight .kp { color: #008000 } /* Keyword.Pseudo */\n",
".highlight .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */\n",
".highlight .kt { color: #B00040 } /* Keyword.Type */\n",
".highlight .m { color: #666666 } /* Literal.Number */\n",
".highlight .s { color: #BA2121 } /* Literal.String */\n",
".highlight .na { color: #7D9029 } /* Name.Attribute */\n",
".highlight .nb { color: #008000 } /* Name.Builtin */\n",
".highlight .nc { color: #0000FF; font-weight: bold } /* Name.Class */\n",
".highlight .no { color: #880000 } /* Name.Constant */\n",
".highlight .nd { color: #AA22FF } /* Name.Decorator */\n",
".highlight .ni { color: #999999; font-weight: bold } /* Name.Entity */\n",
".highlight .ne { color: #D2413A; font-weight: bold } /* Name.Exception */\n",
".highlight .nf { color: #0000FF } /* Name.Function */\n",
".highlight .nl { color: #A0A000 } /* Name.Label */\n",
".highlight .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */\n",
".highlight .nt { color: #008000; font-weight: bold } /* Name.Tag */\n",
".highlight .nv { color: #19177C } /* Name.Variable */\n",
".highlight .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */\n",
".highlight .w { color: #bbbbbb } /* Text.Whitespace */\n",
".highlight .mb { color: #666666 } /* Literal.Number.Bin */\n",
".highlight .mf { color: #666666 } /* Literal.Number.Float */\n",
".highlight .mh { color: #666666 } /* Literal.Number.Hex */\n",
".highlight .mi { color: #666666 } /* Literal.Number.Integer */\n",
".highlight .mo { color: #666666 } /* Literal.Number.Oct */\n",
".highlight .sa { color: #BA2121 } /* Literal.String.Affix */\n",
".highlight .sb { color: #BA2121 } /* Literal.String.Backtick */\n",
".highlight .sc { color: #BA2121 } /* Literal.String.Char */\n",
".highlight .dl { color: #BA2121 } /* Literal.String.Delimiter */\n",
".highlight .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */\n",
".highlight .s2 { color: #BA2121 } /* Literal.String.Double */\n",
".highlight .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */\n",
".highlight .sh { color: #BA2121 } /* Literal.String.Heredoc */\n",
".highlight .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */\n",
".highlight .sx { color: #008000 } /* Literal.String.Other */\n",
".highlight .sr { color: #BB6688 } /* Literal.String.Regex */\n",
".highlight .s1 { color: #BA2121 } /* Literal.String.Single */\n",
".highlight .ss { color: #19177C } /* Literal.String.Symbol */\n",
".highlight .bp { color: #008000 } /* Name.Builtin.Pseudo */\n",
".highlight .fm { color: #0000FF } /* Name.Function.Magic */\n",
".highlight .vc { color: #19177C } /* Name.Variable.Class */\n",
".highlight .vg { color: #19177C } /* Name.Variable.Global */\n",
".highlight .vi { color: #19177C } /* Name.Variable.Instance */\n",
".highlight .vm { color: #19177C } /* Name.Variable.Magic */\n",
".highlight .il { color: #666666 } /* Literal.Number.Integer.Long */</style><div class=\"highlight\"><pre><span></span><span class=\"p\">{</span>\n",
" <span class=\"nt\">&quot;x_foo&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;bar&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;type&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;identity&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;id&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;identity--8d7f0697-e589-4e3b-aa57-cae798d2d138&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;created&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;2017-09-26T21:02:19.465Z&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;modified&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;2017-09-26T21:02:19.465Z&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;name&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;John Smith&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;identity_class&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;individual&quot;</span>\n",
"<span class=\"p\">}</span>\n",
"</pre></div>\n"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
"name": "stdout",
"output_type": "stream",
"text": [
"{\n",
" \"type\": \"identity\",\n",
" \"id\": \"identity--10761df2-93f6-4eb4-9d02-4fccfe5dc91d\",\n",
" \"created\": \"2017-11-03T18:20:48.145Z\",\n",
" \"modified\": \"2017-11-03T18:20:48.145Z\",\n",
" \"name\": \"John Smith\",\n",
" \"identity_class\": \"individual\",\n",
" \"x_foo\": \"bar\"\n",
"}\n"
]
}
],
"source": [
"from stix2 import Identity\n",
"\n",
"identity = Identity(name=\"John Smith\",\n",
" identity_class=\"individual\",\n",
" custom_properties={\n",
@ -923,14 +850,14 @@
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,

View File

@ -114,7 +114,6 @@
"import stix2\n",
"\n",
"stix2.v20.Indicator()\n",
"\n",
"stix2.v21.Indicator()"
]
},
@ -169,9 +168,17 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{\n \"type\": \"indicator\",\n \"id\": \"indicator--dbcbd659-c927-4f9a-994f-0a2632274394\",\n \"created\": \"2017-09-26T23:33:39.829Z\",\n \"modified\": \"2017-09-26T23:33:39.829Z\",\n \"name\": \"File hash for malware variant\",\n \"pattern\": \"[file:hashes.md5 = 'd41d8cd98f00b204e9800998ecf8427e']\",\n \"valid_from\": \"2017-09-26T23:33:39.829952Z\",\n \"labels\": [\n \"malicious-activity\"\n ]\n}\n"
]
}
],
"source": [
"from stix2 import parse\n",
"\n",

View File

@ -153,15 +153,7 @@ class _STIXBase(collections.Mapping):
super(_STIXBase, self).__setattr__(name, value)
def __str__(self):
properties = self.object_properties()
def sort_by(element):
return find_property_index(self, properties, element)
# separators kwarg -> don't include spaces after commas.
return json.dumps(self, indent=4, cls=STIXJSONEncoder,
item_sort_key=sort_by,
separators=(",", ": "))
return self.serialize(pretty=True)
def __repr__(self):
props = [(k, self[k]) for k in self.object_properties() if self.get(k)]
@ -185,6 +177,38 @@ class _STIXBase(collections.Mapping):
def revoke(self):
return _revoke(self)
def serialize(self, pretty=False, **kwargs):
"""
Serialize a STIX object.
Args:
pretty (bool): If True, output properties following the STIX specs
formatting. This includes indentation. Refer to notes for more
details.
**kwargs: The arguments for a json.dumps() call.
Returns:
dict: The serialized JSON object.
Note:
The argument ``pretty=True`` will output the STIX object following
spec order. Using this argument greatly impacts object serialization
performance. If your use case is centered across machine-to-machine
operation it is recommended to set ``pretty=False``.
When ``pretty=True`` the following key-value pairs will be added or
overridden: indent=4, separators=(",", ": "), item_sort_key=sort_by.
"""
if pretty:
properties = self.object_properties()
def sort_by(element):
return find_property_index(self, properties, element)
kwargs.update({'indent': 4, 'separators': (",", ": "), 'item_sort_key': sort_by})
return json.dumps(self, cls=STIXJSONEncoder, **kwargs)
class _Observable(_STIXBase):

View File

@ -11,8 +11,11 @@
|
"""
from abc import ABCMeta, abstractmethod
import uuid
from six import with_metaclass
from stix2.utils import deduplicate
@ -21,94 +24,85 @@ def make_id():
class DataStore(object):
"""An implementer will create a concrete subclass from
this class for the specific DataStore.
"""An implementer can subclass to create custom behavior from
this class for the specific DataStores.
Args:
source (DataSource): An existing DataSource to use
as this DataStore's DataSource component
sink (DataSink): An existing DataSink to use
as this DataStore's DataSink component
Attributes:
id (str): A unique UUIDv4 to identify this DataStore.
source (DataSource): An object that implements DataSource class.
sink (DataSink): An object that implements DataSink class.
"""
def __init__(self, source=None, sink=None):
super(DataStore, self).__init__()
self.id = make_id()
self.source = source
self.sink = sink
def get(self, stix_id, allow_custom=False):
def get(self, *args, **kwargs):
"""Retrieve the most recent version of a single STIX object by ID.
Translate get() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_obj: the single most recent version of the STIX
object specified by the "id".
"""
return self.source.get(stix_id, allow_custom=allow_custom)
return self.source.get(*args, **kwargs)
def all_versions(self, stix_id, allow_custom=False):
def all_versions(self, *args, **kwargs):
"""Retrieve all versions of a single STIX object by ID.
Implement: Translate all_versions() call to the appropriate DataSource call
Translate all_versions() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.all_versions(stix_id, allow_custom=allow_custom)
return self.source.all_versions(*args, **kwargs)
def query(self, query=None, allow_custom=False):
def query(self, *args, **kwargs):
"""Retrieve STIX objects matching a set of filters.
Implement: Specific data source API calls, processing,
functionality required for retrieving query from the data source.
Translate query() call to the appropriate DataSource call.
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on.
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.query(query=query)
return self.source.query(*args, **kwargs)
def add(self, stix_objs, allow_custom=False):
"""Store STIX objects.
def add(self, *args, **kwargs):
"""Method for storing STIX objects.
Translates add() to the appropriate DataSink call.
Define custom behavior before storing STIX objects using the associated
DataSink. Translates add() to the appropriate DataSink call.
Args:
stix_objs (list): a list of STIX objects
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
"""
return self.sink.add(stix_objs, allow_custom=allow_custom)
return self.sink.add(*args, **kwargs)
class DataSink(object):
class DataSink(with_metaclass(ABCMeta)):
"""An implementer will create a concrete subclass from
this class for the specific DataSink.
@ -117,10 +111,12 @@ class DataSink(object):
"""
def __init__(self):
super(DataSink, self).__init__()
self.id = make_id()
def add(self, stix_objs, allow_custom=False):
"""Store STIX objects.
@abstractmethod
def add(self, stix_objs):
"""Method for storing STIX objects.
Implement: Specific data sink API calls, processing,
functionality required for adding data to the sink
@ -128,28 +124,26 @@ class DataSink(object):
Args:
stix_objs (list): a list of STIX objects (where each object is a
STIX object)
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
"""
raise NotImplementedError()
class DataSource(object):
class DataSource(with_metaclass(ABCMeta)):
"""An implementer will create a concrete subclass from
this class for the specific DataSource.
Attributes:
id (str): A unique UUIDv4 to identify this DataSource.
_filters (set): A collection of filters attached to this DataSource.
filters (set): A collection of filters attached to this DataSource.
"""
def __init__(self):
super(DataSource, self).__init__()
self.id = make_id()
self.filters = set()
def get(self, stix_id, _composite_filters=None, allow_custom=False):
@abstractmethod
def get(self, stix_id):
"""
Implement: Specific data source API calls, processing,
functionality required for retrieving data from the data source
@ -158,21 +152,17 @@ class DataSource(object):
stix_id (str): the id of the STIX 2.0 object to retrieve. Should
return a single object, the most recent version of the object
specified by the "id".
_composite_filters (set): set of filters passed from the parent
the CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_obj: the STIX object
"""
raise NotImplementedError()
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
@abstractmethod
def all_versions(self, stix_id):
"""
Implement: Similar to get() except returns list of all object versions of
the specified "id". In addition, implement the specific data
Implement: Similar to get() except returns list of all object versions
of the specified "id". In addition, implement the specific data
source API calls, processing, functionality required for retrieving
data from the data source.
@ -180,35 +170,26 @@ class DataSource(object):
stix_id (str): The id of the STIX 2.0 object to retrieve. Should
return a list of objects, all the versions of the object
specified by the "id".
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_objs (list): a list of STIX objects
"""
raise NotImplementedError()
def query(self, query=None, _composite_filters=None, allow_custom=False):
@abstractmethod
def query(self, query=None):
"""
Implement:Implement the specific data source API calls, processing,
Implement: The specific data source API calls, processing,
functionality required for retrieving query from the data source
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on
_composite_filters (set): a set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
to conduct search on.
Returns:
stix_objs (list): a list of STIX objects
"""
raise NotImplementedError()
class CompositeDataSource(DataSource):
@ -224,7 +205,7 @@ class CompositeDataSource(DataSource):
Attributes:
data_sources (dict): A dictionary of DataSource objects; to be
data_sources (list): A dictionary of DataSource objects; to be
controlled and used by the Data Source Controller object.
"""
@ -237,7 +218,7 @@ class CompositeDataSource(DataSource):
super(CompositeDataSource, self).__init__()
self.data_sources = []
def get(self, stix_id, _composite_filters=None, allow_custom=False):
def get(self, stix_id, _composite_filters=None):
"""Retrieve STIX object by STIX ID
Federated retrieve method, iterates through all DataSources
@ -253,9 +234,7 @@ class CompositeDataSource(DataSource):
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (list): a list of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is attached
to another parent CompositeDataSource), not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
to another parent CompositeDataSource), not user supplied.
Returns:
stix_obj: the STIX object to be returned.
@ -273,7 +252,7 @@ class CompositeDataSource(DataSource):
# for every configured Data Source, call its retrieve handler
for ds in self.data_sources:
data = ds.get(stix_id=stix_id, _composite_filters=all_filters, allow_custom=allow_custom)
data = ds.get(stix_id=stix_id, _composite_filters=all_filters)
if data:
all_data.append(data)
@ -288,22 +267,20 @@ class CompositeDataSource(DataSource):
return stix_obj
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX objects by STIX ID
def all_versions(self, stix_id, _composite_filters=None):
"""Retrieve all versions of a STIX object by STIX ID.
Federated all_versions retrieve method - iterates through all DataSources
defined in "data_sources"
Federated all_versions retrieve method - iterates through all
DataSources defined in "data_sources".
A composite data source will pass its attached filters to
each configured data source, pushing filtering to them to handle
each configured data source, pushing filtering to them to handle.
Args:
stix_id (str): id of the STIX objects to retrieve
stix_id (str): id of the STIX objects to retrieve.
_composite_filters (list): a list of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is attached
to a parent CompositeDataSource), not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
CompositeDataSource (i.e. if this CompositeDataSource is
attached to a parent CompositeDataSource), not user supplied.
Returns:
all_data (list): list of STIX objects that have the specified id
@ -322,7 +299,7 @@ class CompositeDataSource(DataSource):
# retrieve STIX objects from all configured data sources
for ds in self.data_sources:
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters, allow_custom=allow_custom)
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters)
all_data.extend(data)
# remove exact duplicates (where duplicates are STIX 2.0 objects
@ -332,19 +309,17 @@ class CompositeDataSource(DataSource):
return all_data
def query(self, query=None, _composite_filters=None, allow_custom=False):
"""Retrieve STIX objects that match query
def query(self, query=None, _composite_filters=None):
"""Retrieve STIX objects that match a query.
Federate the query to all DataSources attached to the
Composite Data Source.
Args:
query (list): list of filters to search on
query (list): list of filters to search on.
_composite_filters (list): a list of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is attached
to a parent CompositeDataSource), not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
CompositeDataSource (i.e. if this CompositeDataSource is
attached to a parent CompositeDataSource), not user supplied.
Returns:
all_data (list): list of STIX objects to be returned
@ -354,7 +329,7 @@ class CompositeDataSource(DataSource):
raise AttributeError('CompositeDataSource has no data sources')
if not query:
# dont mess with the query (i.e. convert to a set, as thats done
# don't mess with the query (i.e. convert to a set, as that's done
# within the specific DataSources that are called)
query = []
@ -369,7 +344,7 @@ class CompositeDataSource(DataSource):
# federate query to all attached data sources,
# pass composite filters to id
for ds in self.data_sources:
data = ds.query(query=query, _composite_filters=all_filters, allow_custom=allow_custom)
data = ds.query(query=query, _composite_filters=all_filters)
all_data.extend(data)
# remove exact duplicates (where duplicates are STIX 2.0

View File

@ -26,14 +26,15 @@ class FileSystemStore(DataStore):
Default: False.
Attributes:
source (FileSystemSource): FuleSystemSource
source (FileSystemSource): FileSystemSource
sink (FileSystemSink): FileSystemSink
"""
def __init__(self, stix_dir, bundlify=False):
super(FileSystemStore, self).__init__()
self.source = FileSystemSource(stix_dir=stix_dir)
self.sink = FileSystemSink(stix_dir=stix_dir, bundlify=bundlify)
super(FileSystemStore, self).__init__(
source=FileSystemSource(stix_dir=stix_dir),
sink=FileSystemSink(stix_dir=stix_dir, bundlify=bundlify)
)
class FileSystemSink(DataSink):
@ -99,11 +100,11 @@ class FileSystemSink(DataSink):
self._check_path_and_write(stix_data)
elif isinstance(stix_data, (str, dict)):
stix_data = parse(stix_data, allow_custom, version)
stix_data = parse(stix_data, allow_custom=allow_custom, version=version)
if stix_data["type"] == "bundle":
# extract STIX objects
for stix_obj in stix_data.get("objects", []):
self.add(stix_obj)
self.add(stix_obj, allow_custom=allow_custom, version=version)
else:
# adding json-formatted STIX
self._check_path_and_write(stix_data)
@ -111,12 +112,12 @@ class FileSystemSink(DataSink):
elif isinstance(stix_data, Bundle):
# recursively add individual STIX objects
for stix_obj in stix_data.get("objects", []):
self.add(stix_obj)
self.add(stix_obj, allow_custom=allow_custom, version=version)
elif isinstance(stix_data, list):
# recursively add individual STIX objects
for stix_obj in stix_data:
self.add(stix_obj)
self.add(stix_obj, allow_custom=allow_custom, version=version)
else:
raise TypeError("stix_data must be a STIX object (or list of), "
@ -146,7 +147,7 @@ class FileSystemSource(DataSource):
def stix_dir(self):
return self._stix_dir
def get(self, stix_id, _composite_filters=None, allow_custom=False, version=None):
def get(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve STIX object from file directory via STIX ID.
Args:
@ -166,8 +167,7 @@ class FileSystemSource(DataSource):
"""
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters,
allow_custom=allow_custom, version=version)
all_data = self.query(query=query, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)
if all_data:
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
@ -176,7 +176,7 @@ class FileSystemSource(DataSource):
return stix_obj
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False, version=None):
def all_versions(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve STIX object from file directory via STIX ID, all versions.
Note: Since FileSystem sources/sinks don't handle multiple versions
@ -197,10 +197,9 @@ class FileSystemSource(DataSource):
a python STIX objects and then returned
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters,
allow_custom=allow_custom, version=version)]
return [self.get(stix_id=stix_id, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None, allow_custom=False, version=None):
def query(self, query=None, allow_custom=False, version=None, _composite_filters=None):
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
@ -305,7 +304,7 @@ class FileSystemSource(DataSource):
all_data = deduplicate(all_data)
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict, allow_custom, version) for stix_obj_dict in all_data]
stix_objs = [parse(stix_obj_dict, allow_custom=allow_custom, version=version) for stix_obj_dict in all_data]
return stix_objs

View File

@ -24,7 +24,7 @@ from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter, apply_common_filters
def _add(store, stix_data=None, allow_custom=False):
def _add(store, stix_data=None, allow_custom=False, version=None):
"""Add STIX objects to MemoryStore/Sink.
Adds STIX objects to an in-memory dictionary for fast lookup.
@ -34,6 +34,8 @@ def _add(store, stix_data=None, allow_custom=False):
stix_data (list OR dict OR STIX object): STIX objects to be added
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
if isinstance(stix_data, _STIXBase):
@ -44,25 +46,25 @@ def _add(store, stix_data=None, allow_custom=False):
if stix_data["type"] == "bundle":
# adding a json bundle - so just grab STIX objects
for stix_obj in stix_data.get("objects", []):
_add(store, stix_obj, allow_custom=allow_custom)
_add(store, stix_obj, allow_custom=allow_custom, version=version)
else:
# adding a json STIX object
store._data[stix_data["id"]] = stix_data
elif isinstance(stix_data, str):
# adding json encoded string of STIX content
stix_data = parse(stix_data, allow_custom=allow_custom)
stix_data = parse(stix_data, allow_custom=allow_custom, version=version)
if stix_data["type"] == "bundle":
# recurse on each STIX object in bundle
for stix_obj in stix_data.get("objects", []):
_add(store, stix_obj, allow_custom=allow_custom)
_add(store, stix_obj, allow_custom=allow_custom, version=version)
else:
_add(store, stix_data)
_add(store, stix_data, allow_custom=allow_custom, version=version)
elif isinstance(stix_data, list):
# STIX objects are in a list- recurse on each object
for stix_obj in stix_data:
_add(store, stix_obj, allow_custom=allow_custom)
_add(store, stix_obj, allow_custom=allow_custom, version=version)
else:
raise TypeError("stix_data must be a STIX object (or list of), JSON formatted STIX (or list of), or a JSON formatted STIX bundle")
@ -81,6 +83,8 @@ class MemoryStore(DataStore):
stix_data (list OR dict OR STIX object): STIX content to be added
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Attributes:
_data (dict): the in-memory dict that holds STIX objects
@ -88,17 +92,18 @@ class MemoryStore(DataStore):
sink (MemorySink): MemorySink
"""
def __init__(self, stix_data=None, allow_custom=False):
super(MemoryStore, self).__init__()
def __init__(self, stix_data=None, allow_custom=False, version=None):
self._data = {}
if stix_data:
_add(self, stix_data, allow_custom=allow_custom)
_add(self, stix_data, allow_custom=allow_custom, version=version)
self.source = MemorySource(stix_data=self._data, _store=True, allow_custom=allow_custom)
self.sink = MemorySink(stix_data=self._data, _store=True, allow_custom=allow_custom)
super(MemoryStore, self).__init__(
source=MemorySource(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True),
sink=MemorySink(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True)
)
def save_to_file(self, file_path, allow_custom=False):
def save_to_file(self, *args, **kwargs):
"""Write SITX objects from in-memory dictionary to JSON file, as a STIX
Bundle.
@ -108,9 +113,9 @@ class MemoryStore(DataStore):
not. Default: False.
"""
return self.sink.save_to_file(file_path=file_path, allow_custom=allow_custom)
return self.sink.save_to_file(*args, **kwargs)
def load_from_file(self, file_path, allow_custom=False):
def load_from_file(self, *args, **kwargs):
"""Load STIX data from JSON file.
File format is expected to be a single JSON
@ -120,9 +125,11 @@ class MemoryStore(DataStore):
file_path (str): file path to load STIX data from
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
return self.source.load_from_file(file_path=file_path, allow_custom=allow_custom)
return self.source.load_from_file(*args, **kwargs)
class MemorySink(DataSink):
@ -146,17 +153,17 @@ class MemorySink(DataSink):
a MemorySource
"""
def __init__(self, stix_data=None, _store=False, allow_custom=False):
def __init__(self, stix_data=None, allow_custom=False, version=None, _store=False):
super(MemorySink, self).__init__()
self._data = {}
if _store:
self._data = stix_data
elif stix_data:
_add(self, stix_data, allow_custom=allow_custom)
_add(self, stix_data, allow_custom=allow_custom, version=version)
def add(self, stix_data, allow_custom=False):
_add(self, stix_data, allow_custom=allow_custom)
def add(self, stix_data, allow_custom=False, version=None):
_add(self, stix_data, allow_custom=allow_custom, version=version)
add.__doc__ = _add.__doc__
def save_to_file(self, file_path, allow_custom=False):
@ -190,24 +197,22 @@ class MemorySource(DataSource):
a MemorySink
"""
def __init__(self, stix_data=None, _store=False, allow_custom=False):
def __init__(self, stix_data=None, allow_custom=False, version=None, _store=False):
super(MemorySource, self).__init__()
self._data = {}
if _store:
self._data = stix_data
elif stix_data:
_add(self, stix_data, allow_custom=allow_custom)
_add(self, stix_data, allow_custom=allow_custom, version=version)
def get(self, stix_id, _composite_filters=None, allow_custom=False):
def get(self, stix_id, _composite_filters=None):
"""Retrieve STIX object from in-memory dict via STIX ID.
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
composite_filters (set): set of filters passed from the parent
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(dict OR STIX object): STIX object that has the supplied
@ -227,7 +232,7 @@ class MemorySource(DataSource):
# if there are filters from the composite level, process full query
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
all_data = self.query(query=query, _composite_filters=_composite_filters)
if all_data:
# reduce to most recent version
@ -237,7 +242,7 @@ class MemorySource(DataSource):
else:
return None
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
def all_versions(self, stix_id, _composite_filters=None):
"""Retrieve STIX objects from in-memory dict via STIX ID, all versions of it
Note: Since Memory sources/sinks don't handle multiple versions of a
@ -245,10 +250,8 @@ class MemorySource(DataSource):
Args:
stix_id (str): The STIX ID of the STIX 2 object to retrieve.
composite_filters (set): set of filters passed from the parent
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(list): list of STIX objects that has the supplied ID. As the
@ -257,9 +260,9 @@ class MemorySource(DataSource):
is returned in the same form as it as added
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters, allow_custom=allow_custom)]
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None, allow_custom=False):
def query(self, query=None, _composite_filters=None):
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
@ -268,10 +271,8 @@ class MemorySource(DataSource):
Args:
query (list): list of filters to search on
composite_filters (set): set of filters passed from the
_composite_filters (set): set of filters passed from the
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(list): list of STIX objects that matches the supplied
@ -284,7 +285,7 @@ class MemorySource(DataSource):
query = set()
else:
if not isinstance(query, list):
# make sure dont make set from a Filter object,
# make sure don't make set from a Filter object,
# need to make a set from a list of Filter objects (even if just one Filter)
query = [query]
query = set(query)
@ -300,8 +301,8 @@ class MemorySource(DataSource):
return all_data
def load_from_file(self, file_path, allow_custom=False):
def load_from_file(self, file_path, allow_custom=False, version=None):
file_path = os.path.abspath(file_path)
stix_data = json.load(open(file_path, "r"))
_add(self, stix_data, allow_custom=allow_custom)
_add(self, stix_data, allow_custom=allow_custom, version=version)
load_from_file.__doc__ = MemoryStore.load_from_file.__doc__

View File

@ -1,5 +1,5 @@
"""
Python STIX 2.x TaxiiCollectionStore
Python STIX 2.x TAXIICollectionStore
"""
from stix2.base import _STIXBase
@ -20,9 +20,10 @@ class TAXIICollectionStore(DataStore):
collection (taxii2.Collection): TAXII Collection instance
"""
def __init__(self, collection):
super(TAXIICollectionStore, self).__init__()
self.source = TAXIICollectionSource(collection)
self.sink = TAXIICollectionSink(collection)
super(TAXIICollectionStore, self).__init__(
source=TAXIICollectionSource(collection),
sink=TAXIICollectionSink(collection)
)
class TAXIICollectionSink(DataSink):
@ -37,7 +38,7 @@ class TAXIICollectionSink(DataSink):
super(TAXIICollectionSink, self).__init__()
self.collection = collection
def add(self, stix_data, allow_custom=False):
def add(self, stix_data, allow_custom=False, version=None):
"""Add/push STIX content to TAXII Collection endpoint
Args:
@ -46,6 +47,8 @@ class TAXIICollectionSink(DataSink):
json encoded string, or list of any of the following
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
if isinstance(stix_data, _STIXBase):
@ -62,11 +65,11 @@ class TAXIICollectionSink(DataSink):
elif isinstance(stix_data, list):
# adding list of something - recurse on each
for obj in stix_data:
self.add(obj, allow_custom=allow_custom)
self.add(obj, allow_custom=allow_custom, version=version)
elif isinstance(stix_data, str):
# adding json encoded string of STIX content
stix_data = parse(stix_data, allow_custom=allow_custom)
stix_data = parse(stix_data, allow_custom=allow_custom, version=version)
if stix_data["type"] == "bundle":
bundle = dict(stix_data)
else:
@ -90,16 +93,18 @@ class TAXIICollectionSource(DataSource):
super(TAXIICollectionSource, self).__init__()
self.collection = collection
def get(self, stix_id, _composite_filters=None, allow_custom=False):
def get(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve STIX object from local/remote STIX Collection
endpoint.
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
composite_filters (set): set of filters passed from the parent
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
(STIX object): STIX object that has the supplied STIX ID.
@ -121,7 +126,7 @@ class TAXIICollectionSource(DataSource):
stix_obj = list(apply_common_filters(stix_objs, query))
if len(stix_obj):
stix_obj = parse(stix_obj[0], allow_custom=allow_custom)
stix_obj = parse(stix_obj[0], allow_custom=allow_custom, version=version)
if stix_obj.id != stix_id:
# check - was added to handle erroneous TAXII servers
stix_obj = None
@ -130,16 +135,18 @@ class TAXIICollectionSource(DataSource):
return stix_obj
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
def all_versions(self, stix_id, allow_custom=False, version=None, _composite_filters=None):
"""Retrieve STIX object from local/remote TAXII Collection
endpoint, all versions of it
Args:
stix_id (str): The STIX ID of the STIX objects to be retrieved.
composite_filters (set): set of filters passed from the parent
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
(see query() as all_versions() is just a wrapper)
@ -151,17 +158,17 @@ class TAXIICollectionSource(DataSource):
Filter("match[version]", "=", "all")
]
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
all_data = self.query(query=query, allow_custom=allow_custom, _composite_filters=_composite_filters)
# parse STIX objects from TAXII returned json
all_data = [parse(stix_obj) for stix_obj in all_data]
all_data = [parse(stix_obj, allow_custom=allow_custom, version=version) for stix_obj in all_data]
# check - was added to handle erroneous TAXII servers
all_data_clean = [stix_obj for stix_obj in all_data if stix_obj.id == stix_id]
return all_data_clean
def query(self, query=None, _composite_filters=None, allow_custom=False):
def query(self, query=None, allow_custom=False, version=None, _composite_filters=None):
"""Search and retreive STIX objects based on the complete query
A "complete query" includes the filters from the query, the filters
@ -170,10 +177,12 @@ class TAXIICollectionSource(DataSource):
Args:
query (list): list of filters to search on
composite_filters (set): set of filters passed from the
_composite_filters (set): set of filters passed from the
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
Returns:
(list): list of STIX objects that matches the supplied
@ -200,7 +209,7 @@ class TAXIICollectionSource(DataSource):
taxii_filters = self._parse_taxii_filters(query)
# query TAXII collection
all_data = self.collection.get_objects(filters=taxii_filters, allow_custom=allow_custom)["objects"]
all_data = self.collection.get_objects(filters=taxii_filters)["objects"]
# deduplicate data (before filtering as reduces wasted filtering)
all_data = deduplicate(all_data)
@ -209,7 +218,7 @@ class TAXIICollectionSource(DataSource):
all_data = list(apply_common_filters(all_data, query))
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict, allow_custom=allow_custom) for stix_obj_dict in all_data]
stix_objs = [parse(stix_obj_dict, allow_custom=allow_custom, version=version) for stix_obj_dict in all_data]
return stix_objs

View File

@ -1,8 +1,9 @@
import json
import pytest
import stix2
EXPECTED_BUNDLE = """{
"type": "bundle",
"id": "bundle--00000000-0000-0000-0000-000000000004",
@ -41,6 +42,44 @@ EXPECTED_BUNDLE = """{
]
}"""
EXPECTED_BUNDLE_DICT = {
"type": "bundle",
"id": "bundle--00000000-0000-0000-0000-000000000004",
"spec_version": "2.0",
"objects": [
{
"type": "indicator",
"id": "indicator--00000000-0000-0000-0000-000000000001",
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"pattern": "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
"valid_from": "2017-01-01T12:34:56Z",
"labels": [
"malicious-activity"
]
},
{
"type": "malware",
"id": "malware--00000000-0000-0000-0000-000000000002",
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"name": "Cryptolocker",
"labels": [
"ransomware"
]
},
{
"type": "relationship",
"id": "relationship--00000000-0000-0000-0000-000000000003",
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"relationship_type": "indicates",
"source_ref": "indicator--01234567-89ab-cdef-0123-456789abcdef",
"target_ref": "malware--fedcba98-7654-3210-fedc-ba9876543210"
}
]
}
def test_empty_bundle():
bundle = stix2.Bundle()
@ -82,10 +121,17 @@ def test_bundle_with_wrong_spec_version():
assert str(excinfo.value) == "Invalid value for Bundle 'spec_version': must equal '2.0'."
def test_create_bundle(indicator, malware, relationship):
def test_create_bundle1(indicator, malware, relationship):
bundle = stix2.Bundle(objects=[indicator, malware, relationship])
assert str(bundle) == EXPECTED_BUNDLE
assert bundle.serialize(pretty=True) == EXPECTED_BUNDLE
def test_create_bundle2(indicator, malware, relationship):
bundle = stix2.Bundle(objects=[indicator, malware, relationship])
assert json.loads(bundle.serialize()) == EXPECTED_BUNDLE_DICT
def test_create_bundle_with_positional_args(indicator, malware, relationship):

View File

@ -1,9 +1,9 @@
import pytest
from taxii2client import Collection
from stix2 import Filter, MemorySource
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, make_id, taxii)
from stix2 import Filter, MemorySink, MemorySource
from stix2.sources import (CompositeDataSource, DataSink, DataSource, make_id,
taxii)
from stix2.sources.filters import apply_common_filters
from stix2.utils import deduplicate
@ -20,11 +20,6 @@ def collection():
return Collection(COLLECTION_URL, MockTAXIIClient())
@pytest.fixture
def ds():
return DataSource()
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
@ -127,21 +122,11 @@ STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
def test_ds_abstract_class_smoke():
ds1 = DataSource()
ds2 = DataSink()
ds3 = DataStore(source=ds1, sink=ds2)
with pytest.raises(TypeError):
DataSource()
with pytest.raises(NotImplementedError):
ds3.add(None)
with pytest.raises(NotImplementedError):
ds3.all_versions("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")
with pytest.raises(NotImplementedError):
ds3.get("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")
with pytest.raises(NotImplementedError):
ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")])
with pytest.raises(TypeError):
DataSink()
def test_ds_taxii(collection):
@ -177,7 +162,8 @@ def test_parse_taxii_filters():
assert taxii_filters == expected_params
def test_add_get_remove_filter(ds):
def test_add_get_remove_filter():
ds = taxii.TAXIICollectionSource(collection)
# First 3 filters are valid, remaining properties are erroneous in some way
valid_filters = [
@ -226,7 +212,7 @@ def test_add_get_remove_filter(ds):
ds.filters.update(valid_filters)
def test_apply_common_filters(ds):
def test_apply_common_filters():
stix_objs = [
{
"created": "2017-01-27T13:49:53.997Z",
@ -374,35 +360,35 @@ def test_apply_common_filters(ds):
assert len(resp) == 0
def test_filters0(ds):
def test_filters0():
# "Return any object modified before 2017-01-28T13:49:53.935Z"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters1(ds):
def test_filters1():
# "Return any object modified after 2017-01-28T13:49:53.935Z"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters2(ds):
def test_filters2():
# "Return any object modified after or on 2017-01-28T13:49:53.935Z"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
def test_filters3(ds):
def test_filters3():
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")]))
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters4(ds):
def test_filters4():
# Assert invalid Filter cannot be created
with pytest.raises(ValueError) as excinfo:
Filter("modified", "?", "2017-01-27T13:49:53.935Z")
@ -410,21 +396,21 @@ def test_filters4(ds):
"for specified property: 'modified'")
def test_filters5(ds):
def test_filters5():
# "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters6(ds):
def test_filters6():
# Test filtering on non-common property
resp = list(apply_common_filters(STIX_OBJS2, [Filter("name", "=", "Malicious site hosting downloader")]))
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
def test_filters7(ds):
def test_filters7():
# Test filtering on embedded property
stix_objects = list(STIX_OBJS2) + [{
"type": "observed-data",
@ -463,7 +449,7 @@ def test_filters7(ds):
assert len(resp) == 1
def test_deduplicate(ds):
def test_deduplicate():
unique = deduplicate(STIX_OBJS1)
# Only 3 objects are unique
@ -483,14 +469,14 @@ def test_deduplicate(ds):
def test_add_remove_composite_datasource():
cds = CompositeDataSource()
ds1 = DataSource()
ds2 = DataSource()
ds3 = DataSink()
ds1 = MemorySource()
ds2 = MemorySource()
ds3 = MemorySink()
with pytest.raises(TypeError) as excinfo:
cds.add_data_sources([ds1, ds2, ds1, ds3])
assert str(excinfo.value) == ("DataSource (to be added) is not of type "
"stix2.DataSource. DataSource type is '<class 'stix2.sources.DataSink'>'")
"stix2.DataSource. DataSource type is '<class 'stix2.sources.memory.MemorySink'>'")
cds.add_data_sources([ds1, ds2, ds1])
@ -506,29 +492,58 @@ def test_composite_datasource_operations():
objects=STIX_OBJS1,
spec_version="2.0",
type="bundle")
cds = CompositeDataSource()
ds1 = MemorySource(stix_data=BUNDLE1)
ds2 = MemorySource(stix_data=STIX_OBJS2)
cds1 = CompositeDataSource()
ds1_1 = MemorySource(stix_data=BUNDLE1)
ds1_2 = MemorySource(stix_data=STIX_OBJS2)
cds.add_data_sources([ds1, ds2])
cds2 = CompositeDataSource()
ds2_1 = MemorySource(stix_data=BUNDLE1)
ds2_2 = MemorySource(stix_data=STIX_OBJS2)
indicators = cds.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
cds1.add_data_sources([ds1_1, ds1_2])
cds2.add_data_sources([ds2_1, ds2_2])
indicators = cds1.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
# In STIX_OBJS2 changed the 'modified' property to a later time...
assert len(indicators) == 2
indicator = cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
cds1.add_data_sources([cds2])
indicator = cds1.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
assert indicator["type"] == "indicator"
query = [
query1 = [
Filter("type", "=", "indicator")
]
results = cds.query(query)
query2 = [
Filter("valid_from", "=", "2017-01-27T13:49:53.935382Z")
]
cds1.filters.update(query2)
results = cds1.query(query1)
# STIX_OBJS2 has indicator with later time, one with different id, one with
# original time in STIX_OBJS1
assert len(results) == 3
indicator = cds1.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
assert indicator["type"] == "indicator"
# There is only one indicator with different ID. Since we use the same data
# when deduplicated, only two indicators (one with different modified).
results = cds1.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
assert len(results) == 2
# Since we have filters already associated with our CompositeSource providing
# nothing returns the same as cds1.query(query1) (the associated query is query2)
results = cds1.query([])
assert len(results) == 3

View File

@ -340,7 +340,7 @@ def test_filesystem_object_with_custom_property(fs_store):
fs_store.add(camp, True)
camp_r = fs_store.get(camp.id, True)
camp_r = fs_store.get(camp.id, allow_custom=True)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
@ -352,9 +352,9 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store):
allow_custom=True)
bundle = Bundle(camp, allow_custom=True)
fs_store.add(bundle, True)
fs_store.add(bundle, allow_custom=True)
camp_r = fs_store.get(camp.id, True)
camp_r = fs_store.get(camp.id, allow_custom=True)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
@ -367,9 +367,9 @@ def test_filesystem_custom_object(fs_store):
pass
newobj = NewObj(property1='something')
fs_store.add(newobj, True)
fs_store.add(newobj, allow_custom=True)
newobj_r = fs_store.get(newobj.id, True)
newobj_r = fs_store.get(newobj.id, allow_custom=True)
assert newobj_r.id == newobj.id
assert newobj_r.property1 == 'something'

View File

@ -62,4 +62,15 @@ def test_parse_no_type():
"identity_class": "individual"
}""")
def test_identity_with_custom():
identity = stix2.Identity(
name="John Smith",
identity_class="individual",
custom_properties={'x_foo': 'bar'}
)
assert identity.x_foo == "bar"
assert "x_foo" in identity.object_properties()
# TODO: Add other examples

View File

@ -254,7 +254,7 @@ def test_memory_store_object_with_custom_property(mem_store):
mem_store.add(camp, True)
camp_r = mem_store.get(camp.id, True)
camp_r = mem_store.get(camp.id)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
@ -268,7 +268,7 @@ def test_memory_store_object_with_custom_property_in_bundle(mem_store):
bundle = Bundle(camp, allow_custom=True)
mem_store.add(bundle, True)
bundle_r = mem_store.get(bundle.id, True)
bundle_r = mem_store.get(bundle.id)
camp_r = bundle_r['objects'][0]
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
@ -284,6 +284,6 @@ def test_memory_store_custom_object(mem_store):
newobj = NewObj(property1='something')
mem_store.add(newobj, True)
newobj_r = mem_store.get(newobj.id, True)
newobj_r = mem_store.get(newobj.id)
assert newobj_r.id == newobj.id
assert newobj_r.property1 == 'something'