Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into stix2.1

master
Emmanuelle Vargas-Gonzalez 2018-11-01 07:57:09 -04:00
commit eff5369670
16 changed files with 275 additions and 118 deletions

View File

@ -1,6 +1,18 @@
CHANGELOG
=========
1.0.3 - 2018-10-31
* #187 Pickle proof objects
* #181 Improvements to error handling for datastores
* #184, #192 Fix "pretty" JSON serialization problems
* #195 Fix wrong property name for raster-image-ext
* #201 UUIDv4 enforcement on identifiers
* #203 New filter option "contains" for datastores
* #207 Add documentation on patterning
* #213 Python 3.7 support
* #214 Support for multiple object versions in MemoryStore
1.0.2 - 2018-05-18
* Fixes bugs when using allow_custom (#176, #179).

View File

@ -34,8 +34,8 @@ project = 'stix2'
copyright = '2017, OASIS Open'
author = 'OASIS Open'
version = '1.0.2'
release = '1.0.2'
version = '1.0.3'
release = '1.0.3'
language = None
exclude_patterns = ['_build', '_templates', 'Thumbs.db', '.DS_Store', 'guide/.ipynb_checkpoints']

View File

@ -62,11 +62,11 @@
"\n",
"\n",
"### Memory API\n",
"A note on adding and retreiving STIX content to the Memory suite: As mentioned, under the hood the Memory suite is an internal, in-memory dictionary. STIX content that is to be added can be in the following forms: python-stix2 objects, (Python) dictionaries (of valid STIX objects or Bundles), JSON-encoded strings (of valid STIX objects or Bundles), or a (Python) list of any of the previously listed types. [MemoryStore](../api/datastore/stix2.datastore.memory.rst#stix2.datastore.memory.MemoryStore) actually stores STIX content either as python-stix2 objects or as (Python) dictionaries, reducing and converting any of the aforementioned types to one of those. Additionally, whatever form the STIX object is stored as, is how it will be returned when retrieved. python-stix2 objects, and json-encoded strings (of STIX content) are stored as python-stix2 objects, while (Python) dictionaries (of STIX objects) are stored as (Python) dictionaries.\n",
"A note on adding and retreiving STIX content to the Memory suite: As mentioned, under the hood the Memory suite is an internal, in-memory dictionary. STIX content that is to be added can be in the following forms: python-stix2 objects, (Python) dictionaries (of valid STIX objects or Bundles), JSON-encoded strings (of valid STIX objects or Bundles), or a (Python) list of any of the previously listed types. [MemoryStore](../api/datastore/stix2.datastore.memory.rst#stix2.datastore.memory.MemoryStore) actually stores and retrieves STIX content as python-stix2 objects.\n",
"\n",
"A note on [load_from_file()](../api/datastore/stix2.datastore.memory.rst#stix2.datastore.memory.MemoryStore.load_from_file): For [load_from_file()](../api/datastore/stix2.datastore.memory.rst#stix2.datastore.memory.MemoryStore.load_from_file), STIX content is assumed to be in JSON form within the file, as an individual STIX object or in a Bundle. When the JSON is loaded, the STIX objects are parsed into python-stix2 objects before being stored in the in-memory dictionary.\n",
"\n",
"A note on [save_to_file()](../api/datastore/stix2.datastore.memory.rst#stix2.datastore.memory.MemoryStore.save_to_file): This method dumps all STIX content that is in the [MemoryStore](../api/datastore/stix2.datastore.memory.rst#stix2.datastore.memory.MemoryStore) to the specified file. The file format will be JSON, and the STIX content will be within a STIX Bundle. Note also that the output form will be a JSON STIX Bundle regardless of the form that the individual STIX objects are stored in (i.e. supplied to) the [MemoryStore](../api/datastore/stix2.datastore.memory.rst#stix2.datastore.memory.MemoryStore). \n",
"A note on [save_to_file()](../api/datastore/stix2.datastore.memory.rst#stix2.datastore.memory.MemoryStore.save_to_file): This method dumps all STIX content that is in the [MemoryStore](../api/datastore/stix2.datastore.memory.rst#stix2.datastore.memory.MemoryStore) to the specified file. The file format will be JSON, and the STIX content will be within a STIX Bundle.\n",
"\n",
"### Memory Examples\n",
"\n",

View File

@ -230,7 +230,7 @@
"metadata": {},
"source": [
"### How parsing works\n",
"If the ``version`` positional argument is not provided. The data will be parsed using the latest version of STIX 2.X supported by the `stix2` library.\n",
"If the ``version`` positional argument is not provided. The library will make the best attempt using the \"spec_version\" property found on a Bundle, SDOs, and SROs.\n",
"\n",
"You can lock your [parse()](../api/stix2.core.rst#stix2.core.parse) method to a specific STIX version by:"
]

View File

@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.0.2
current_version = 1.0.3
commit = True
tag = True

View File

@ -4,7 +4,7 @@ import io
import json
import os
from stix2 import Bundle, v20
from stix2 import v20, v21
from stix2.core import parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
@ -86,7 +86,7 @@ class FileSystemSink(DataSink):
# Assuming future specs will allow multiple SDO/SROs
# versions in a single bundle we won't need to check this
# and just use the latest supported Bundle version.
stix_obj = Bundle(stix_obj, allow_custom=self.allow_custom)
stix_obj = v21.Bundle(stix_obj, allow_custom=self.allow_custom)
else:
stix_obj = v20.Bundle(stix_obj, allow_custom=self.allow_custom)

View File

@ -120,8 +120,11 @@ def apply_common_filters(stix_objs, query):
Supports only STIX 2.0 common property properties.
Args:
stix_objs (list): list of STIX objects to apply the query to
query (set): set of filters (combined form complete query)
stix_objs (iterable): iterable of STIX objects to apply the query to
query (non-iterator iterable): iterable of filters. Can't be an
iterator (e.g. generator iterators won't work), since this is
used in an inner loop of a nested loop. So we require the ability
to traverse the filters repeatedly.
Yields:
STIX objects that successfully evaluate against the query.

View File

@ -1,29 +1,20 @@
"""
Python STIX 2.0 Memory Source/Sink
TODO:
Use deduplicate() calls only when memory corpus is dirty (been added to)
can save a lot of time for successive queries
Note:
Not worrying about STIX versioning. The in memory STIX data at anytime
will only hold one version of a STIX object. As such, when save() is called,
the single versions of all the STIX objects are what is written to file.
"""
import collections
import io
import itertools
import json
import os
from stix2 import Bundle, v20
from stix2 import v20, v21
from stix2.base import _STIXBase
from stix2.core import parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.datastore.filters import FilterSet, apply_common_filters
def _add(store, stix_data=None):
def _add(store, stix_data, allow_custom=True, version=None):
"""Add STIX objects to MemoryStore/Sink.
Adds STIX objects to an in-memory dictionary for fast lookup.
@ -31,25 +22,82 @@ def _add(store, stix_data=None):
Args:
stix_data (list OR dict OR STIX object): STIX objects to be added
version (str): Which STIX2 version to lock the parser to. (e.g. "2.0",
"2.1"). If None, the library makes the best effort to figure
out the spec representation of the object.
"""
if isinstance(stix_data, collections.Mapping):
if stix_data['type'] == 'bundle':
# adding a json bundle - so just grab STIX objects
for stix_obj in stix_data.get('objects', []):
_add(store, stix_obj)
else:
# adding a json STIX object
store._data[stix_data['id']] = stix_data
elif isinstance(stix_data, list):
if isinstance(stix_data, list):
# STIX objects are in a list- recurse on each object
for stix_obj in stix_data:
_add(store, stix_obj)
_add(store, stix_obj, allow_custom, version)
elif stix_data["type"] == "bundle":
# adding a json bundle - so just grab STIX objects
for stix_obj in stix_data.get("objects", []):
_add(store, stix_obj, allow_custom, version)
else:
raise TypeError("stix_data expected to be a python-stix2 object (or list of), JSON formatted STIX (or list of),"
" or a JSON formatted STIX bundle. stix_data was of type: " + str(type(stix_data)))
# Adding a single non-bundle object
if isinstance(stix_data, _STIXBase):
stix_obj = stix_data
else:
stix_obj = parse(stix_data, allow_custom, version)
# Map ID directly to the object, if it is a marking. Otherwise,
# map to a family, so we can track multiple versions.
if _is_marking(stix_obj):
store._data[stix_obj.id] = stix_obj
else:
if stix_obj.id in store._data:
obj_family = store._data[stix_obj.id]
else:
obj_family = _ObjectFamily()
store._data[stix_obj.id] = obj_family
obj_family.add(stix_obj)
def _is_marking(obj_or_id):
"""Determines whether the given object or object ID is/is for a marking
definition.
:param obj_or_id: A STIX object or object ID as a string.
:return: True if a marking definition, False otherwise.
"""
if isinstance(obj_or_id, _STIXBase):
id_ = obj_or_id.id
else:
id_ = obj_or_id
return id_.startswith("marking-definition--")
class _ObjectFamily(object):
"""
An internal implementation detail of memory sources/sinks/stores.
Represents a "family" of STIX objects: all objects with a particular
ID. (I.e. all versions.) The latest version is also tracked so that it
can be obtained quickly.
"""
def __init__(self):
self.all_versions = {}
self.latest_version = None
def add(self, obj):
self.all_versions[obj.modified] = obj
if self.latest_version is None or \
obj.modified > self.latest_version.modified:
self.latest_version = obj
def __str__(self):
return "<<{}; latest={}>>".format(self.all_versions,
self.latest_version.modified)
def __repr__(self):
return str(self)
class MemoryStore(DataStoreMixin):
@ -77,7 +125,7 @@ class MemoryStore(DataStoreMixin):
self._data = {}
if stix_data:
_add(self, stix_data)
_add(self, stix_data, allow_custom)
super(MemoryStore, self).__init__(
source=MemorySource(stix_data=self._data, allow_custom=allow_custom, _store=True),
@ -132,24 +180,30 @@ class MemorySink(DataSink):
"""
def __init__(self, stix_data=None, allow_custom=True, _store=False):
super(MemorySink, self).__init__()
self._data = {}
self.allow_custom = allow_custom
if _store:
self._data = stix_data
elif stix_data:
_add(self, stix_data)
else:
self._data = {}
if stix_data:
_add(self, stix_data, allow_custom)
def add(self, stix_data):
_add(self, stix_data)
_add(self, stix_data, self.allow_custom)
add.__doc__ = _add.__doc__
def save_to_file(self, path, encoding='utf-8'):
path = os.path.abspath(path)
all_objs = list(self._data.values())
all_objs = list(itertools.chain.from_iterable(
value.all_versions.values() if isinstance(value, _ObjectFamily)
else [value]
for value in self._data.values()
))
if any('spec_version' in x for x in all_objs):
bundle = Bundle(all_objs, allow_custom=self.allow_custom)
bundle = v21.Bundle(all_objs, allow_custom=self.allow_custom)
else:
bundle = v20.Bundle(all_objs, allow_custom=self.allow_custom)
@ -190,13 +244,14 @@ class MemorySource(DataSource):
"""
def __init__(self, stix_data=None, allow_custom=True, _store=False):
super(MemorySource, self).__init__()
self._data = {}
self.allow_custom = allow_custom
if _store:
self._data = stix_data
elif stix_data:
_add(self, stix_data)
else:
self._data = {}
if stix_data:
_add(self, stix_data, allow_custom)
def get(self, stix_id, _composite_filters=None):
"""Retrieve STIX object from in-memory dict via STIX ID.
@ -207,32 +262,29 @@ class MemorySource(DataSource):
CompositeDataSource, not user supplied
Returns:
(dict OR STIX object): STIX object that has the supplied
ID. As the MemoryStore(i.e. MemorySink) adds STIX objects to memory
as they are supplied (either as python dictionary or STIX object), it
is returned in the same form as it as added
(STIX object): STIX object that has the supplied ID.
"""
if _composite_filters is None:
# if get call is only based on 'id', no need to search, just retrieve from dict
try:
stix_obj = self._data[stix_id]
except KeyError:
stix_obj = None
return stix_obj
stix_obj = None
# if there are filters from the composite level, process full query
query = [Filter('id', '=', stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters)
if all_data:
# reduce to most recent version
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
return stix_obj
if _is_marking(stix_id):
stix_obj = self._data.get(stix_id)
else:
return None
object_family = self._data.get(stix_id)
if object_family:
stix_obj = object_family.latest_version
if stix_obj:
all_filters = list(
itertools.chain(
_composite_filters or [],
self.filters
)
)
stix_obj = next(apply_common_filters([stix_obj], all_filters), None)
return stix_obj
def all_versions(self, stix_id, _composite_filters=None):
"""Retrieve STIX objects from in-memory dict via STIX ID, all versions
@ -247,14 +299,33 @@ class MemorySource(DataSource):
the parent CompositeDataSource, not user supplied
Returns:
(list): list of STIX objects that has the supplied ID. As the
MemoryStore(i.e. MemorySink) adds STIX objects to memory as they
are supplied (either as python dictionary or STIX object), it
is returned in the same form as it as added
(list): list of STIX objects that have the supplied ID.
"""
results = []
stix_objs_to_filter = None
if _is_marking(stix_id):
stix_obj = self._data.get(stix_id)
if stix_obj:
stix_objs_to_filter = [stix_obj]
else:
object_family = self._data.get(stix_id)
if object_family:
stix_objs_to_filter = object_family.all_versions.values()
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
if stix_objs_to_filter:
all_filters = list(
itertools.chain(
_composite_filters or [],
self.filters
)
)
results.extend(
apply_common_filters(stix_objs_to_filter, all_filters)
)
return results
def query(self, query=None, _composite_filters=None):
"""Search and retrieve STIX objects based on the complete query.
@ -269,10 +340,7 @@ class MemorySource(DataSource):
the CompositeDataSource, not user supplied
Returns:
(list): list of STIX objects that matches the supplied
query. As the MemoryStore(i.e. MemorySink) adds STIX objects
to memory as they are supplied (either as python dictionary or
STIX object), it is returned in the same form as it as added.
(list): list of STIX objects that match the supplied query.
"""
query = FilterSet(query)
@ -283,17 +351,20 @@ class MemorySource(DataSource):
if _composite_filters:
query.add(_composite_filters)
all_objs = itertools.chain.from_iterable(
value.all_versions.values() if isinstance(value, _ObjectFamily)
else [value]
for value in self._data.values()
)
# Apply STIX common property filters.
all_data = list(apply_common_filters(self._data.values(), query))
all_data = list(apply_common_filters(all_objs, query))
return all_data
def load_from_file(self, file_path):
stix_data = json.load(open(os.path.abspath(file_path), 'r'))
with open(os.path.abspath(file_path), "r") as f:
stix_data = json.load(f)
if stix_data['type'] == 'bundle':
for stix_obj in stix_data['objects']:
_add(self, stix_data=parse(stix_obj, allow_custom=self.allow_custom))
else:
_add(self, stix_data=parse(stix_data, allow_custom=self.allow_custom))
_add(self, stix_data, self.allow_custom)
load_from_file.__doc__ = MemoryStore.load_from_file.__doc__

View File

@ -1,7 +1,7 @@
"""Python STIX 2.x TAXIICollectionStore"""
from requests.exceptions import HTTPError
from stix2 import Bundle, v20
from stix2 import v20, v21
from stix2.base import _STIXBase
from stix2.core import parse
from stix2.datastore import (
@ -95,7 +95,7 @@ class TAXIICollectionSink(DataSink):
bundle = stix_data.serialize(encoding='utf-8', ensure_ascii=False)
elif 'spec_version' in stix_data:
# If the spec_version is present, use new Bundle object...
bundle = Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
bundle = v21.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
else:
bundle = v20.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
@ -105,7 +105,7 @@ class TAXIICollectionSink(DataSink):
bundle = parse(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
elif 'spec_version' in stix_data:
# If the spec_version is present, use new Bundle object...
bundle = Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
bundle = v21.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
else:
bundle = v20.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
@ -122,7 +122,7 @@ class TAXIICollectionSink(DataSink):
bundle = stix_data.serialize(encoding='utf-8', ensure_ascii=False)
elif 'spec_version' in stix_data:
# If the spec_version is present, use new Bundle object...
bundle = Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
bundle = v21.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
else:
bundle = v20.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)

View File

@ -2,7 +2,9 @@ import pytest
from stix2.datastore import CompositeDataSource, make_id
from stix2.datastore.filters import Filter
from stix2.datastore.memory import MemorySink, MemorySource
from stix2.datastore.memory import MemorySink, MemorySource, MemoryStore
from stix2.utils import parse_into_datetime
from stix2.v20.common import TLP_GREEN
def test_add_remove_composite_datasource():
@ -48,14 +50,14 @@ def test_composite_datasource_operations(stix_objs1, stix_objs2):
indicators = cds1.all_versions("indicator--00000000-0000-4000-8000-000000000001")
# In STIX_OBJS2 changed the 'modified' property to a later time...
assert len(indicators) == 2
assert len(indicators) == 3
cds1.add_data_sources([cds2])
indicator = cds1.get("indicator--00000000-0000-4000-8000-000000000001")
assert indicator["id"] == "indicator--00000000-0000-4000-8000-000000000001"
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
assert indicator["modified"] == parse_into_datetime("2017-01-31T13:49:53.935Z")
assert indicator["type"] == "indicator"
query1 = [
@ -72,20 +74,80 @@ def test_composite_datasource_operations(stix_objs1, stix_objs2):
# STIX_OBJS2 has indicator with later time, one with different id, one with
# original time in STIX_OBJS1
assert len(results) == 3
assert len(results) == 4
indicator = cds1.get("indicator--00000000-0000-4000-8000-000000000001")
assert indicator["id"] == "indicator--00000000-0000-4000-8000-000000000001"
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
assert indicator["modified"] == parse_into_datetime("2017-01-31T13:49:53.935Z")
assert indicator["type"] == "indicator"
# There is only one indicator with different ID. Since we use the same data
# when deduplicated, only two indicators (one with different modified).
results = cds1.all_versions("indicator--00000000-0000-4000-8000-000000000001")
assert len(results) == 2
assert len(results) == 3
# Since we have filters already associated with our CompositeSource providing
# nothing returns the same as cds1.query(query1) (the associated query is query2)
results = cds1.query([])
assert len(results) == 3
assert len(results) == 4
def test_source_markings():
msrc = MemorySource(TLP_GREEN)
assert msrc.get(TLP_GREEN.id) == TLP_GREEN
assert msrc.all_versions(TLP_GREEN.id) == [TLP_GREEN]
assert msrc.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN]
def test_sink_markings():
# just make sure there is no crash
msink = MemorySink(TLP_GREEN)
msink.add(TLP_GREEN)
def test_store_markings():
mstore = MemoryStore(TLP_GREEN)
assert mstore.get(TLP_GREEN.id) == TLP_GREEN
assert mstore.all_versions(TLP_GREEN.id) == [TLP_GREEN]
assert mstore.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN]
def test_source_mixed(indicator):
msrc = MemorySource([TLP_GREEN, indicator])
assert msrc.get(TLP_GREEN.id) == TLP_GREEN
assert msrc.all_versions(TLP_GREEN.id) == [TLP_GREEN]
assert msrc.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN]
assert msrc.get(indicator.id) == indicator
assert msrc.all_versions(indicator.id) == [indicator]
assert msrc.query(Filter("id", "=", indicator.id)) == [indicator]
all_objs = msrc.query()
assert TLP_GREEN in all_objs
assert indicator in all_objs
assert len(all_objs) == 2
def test_sink_mixed(indicator):
# just make sure there is no crash
msink = MemorySink([TLP_GREEN, indicator])
msink.add([TLP_GREEN, indicator])
def test_store_mixed(indicator):
mstore = MemoryStore([TLP_GREEN, indicator])
assert mstore.get(TLP_GREEN.id) == TLP_GREEN
assert mstore.all_versions(TLP_GREEN.id) == [TLP_GREEN]
assert mstore.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN]
assert mstore.get(indicator.id) == indicator
assert mstore.all_versions(indicator.id) == [indicator]
assert mstore.query(Filter("id", "=", indicator.id)) == [indicator]
all_objs = mstore.query()
assert TLP_GREEN in all_objs
assert indicator in all_objs
assert len(all_objs) == 2

View File

@ -8,6 +8,7 @@ from stix2.datastore import make_id
from stix2.v20 import (
Bundle, Campaign, CustomObject, Identity, Indicator, Malware, Relationship,
)
from stix2.utils import parse_into_datetime
from .constants import (
CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID,
@ -171,7 +172,7 @@ def test_memory_store_all_versions(mem_store):
))
resp = mem_store.all_versions("indicator--00000000-0000-4000-8000-000000000001")
assert len(resp) == 1 # MemoryStore can only store 1 version of each object
assert len(resp) == 3
def test_memory_store_query(mem_store):
@ -183,25 +184,27 @@ def test_memory_store_query(mem_store):
def test_memory_store_query_single_filter(mem_store):
query = Filter('id', '=', 'indicator--00000000-0000-4000-8000-000000000001')
resp = mem_store.query(query)
assert len(resp) == 1
assert len(resp) == 2
def test_memory_store_query_empty_query(mem_store):
resp = mem_store.query()
# sort since returned in random order
resp = sorted(resp, key=lambda k: k['id'])
assert len(resp) == 2
resp = sorted(resp, key=lambda k: (k['id'], k['modified']))
assert len(resp) == 3
assert resp[0]['id'] == 'indicator--00000000-0000-4000-8000-000000000001'
assert resp[0]['modified'] == '2017-01-27T13:49:53.936Z'
assert resp[1]['id'] == 'indicator--00000000-0000-4000-8000-000000000002'
assert resp[1]['modified'] == '2017-01-27T13:49:53.935Z'
assert resp[0]['modified'] == parse_into_datetime('2017-01-27T13:49:53.935Z')
assert resp[1]['id'] == 'indicator--00000000-0000-4000-8000-000000000001'
assert resp[1]['modified'] == parse_into_datetime('2017-01-27T13:49:53.936Z')
assert resp[2]['id'] == 'indicator--00000000-0000-4000-8000-000000000002'
assert resp[2]['modified'] == parse_into_datetime('2017-01-27T13:49:53.935Z')
def test_memory_store_query_multiple_filters(mem_store):
mem_store.source.filters.add(Filter('type', '=', 'indicator'))
query = Filter('id', '=', 'indicator--00000000-0000-4000-8000-000000000001')
resp = mem_store.query(query)
assert len(resp) == 1
assert len(resp) == 2
def test_memory_store_save_load_file(mem_store, fs_mem_store):
@ -222,12 +225,8 @@ def test_memory_store_save_load_file(mem_store, fs_mem_store):
def test_memory_store_add_invalid_object(mem_store):
ind = ('indicator', IND1) # tuple isn't valid
with pytest.raises(TypeError) as excinfo:
with pytest.raises(TypeError):
mem_store.add(ind)
assert 'stix_data expected to be' in str(excinfo.value)
assert 'a python-stix2 object' in str(excinfo.value)
assert 'JSON formatted STIX' in str(excinfo.value)
assert 'JSON formatted STIX bundle' in str(excinfo.value)
def test_memory_store_object_with_custom_property(mem_store):

View File

@ -16,8 +16,10 @@ COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c
class MockTAXIICollectionEndpoint(Collection):
"""Mock for taxii2_client.TAXIIClient"""
def __init__(self, url, **kwargs):
super(MockTAXIICollectionEndpoint, self).__init__(url, **kwargs)
def __init__(self, url, collection_info):
super(MockTAXIICollectionEndpoint, self).__init__(
url, collection_info=collection_info
)
self.objects = []
def add_objects(self, bundle):
@ -69,7 +71,7 @@ class MockTAXIICollectionEndpoint(Collection):
@pytest.fixture
def collection(stix_objs1):
mock = MockTAXIICollectionEndpoint(
COLLECTION_URL, **{
COLLECTION_URL, {
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
"title": "Writable Collection",
"description": "This collection is a dropbox for submitting indicators",
@ -88,7 +90,7 @@ def collection(stix_objs1):
@pytest.fixture
def collection_no_rw_access(stix_objs1):
mock = MockTAXIICollectionEndpoint(
COLLECTION_URL, **{
COLLECTION_URL, {
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
"title": "Not writeable or readable Collection",
"description": "This collection is a dropbox for submitting indicators",

View File

@ -131,7 +131,7 @@ def test_environment_functions():
# Get both versions of the object
resp = env.all_versions(INDICATOR_ID)
assert len(resp) == 1 # should be 2, but MemoryStore only keeps 1 version of objects
assert len(resp) == 2
# Get just the most recent version of the object
resp = env.get(INDICATOR_ID)

View File

@ -11,7 +11,7 @@ from .constants import MARKING_DEFINITION_ID
EXPECTED_TLP_MARKING_DEFINITION = """{
"type": "marking-definition",
"id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
"created": "2017-01-20T00:00:00Z",
"created": "2017-01-20T00:00:00.000Z",
"definition_type": "tlp",
"definition": {
"tlp": "white"

View File

@ -1,6 +1,7 @@
"""STIX 2 Common Data Types and Properties."""
from collections import OrderedDict
import copy
from ..base import _STIXBase
from ..custom import _custom_marking_builder
@ -120,6 +121,13 @@ class MarkingDefinition(_STIXBase, _MarkingsMixin):
except KeyError:
raise ValueError("definition_type must be a valid marking type")
if marking_type == TLPMarking:
# TLP instances in the spec have millisecond precision unlike other markings
self._properties = copy.deepcopy(self._properties)
self._properties.update([
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
])
if not isinstance(kwargs['definition'], marking_type):
defn = _get_dict(kwargs['definition'])
kwargs['definition'] = marking_type(**defn)

View File

@ -1 +1 @@
__version__ = "1.0.2"
__version__ = "1.0.3"