Finish alignment of 2.1 components

master
Emmanuelle Vargas-Gonzalez 2018-11-01 08:17:34 -04:00
parent eff5369670
commit 5e5d10e7aa
10 changed files with 149 additions and 69 deletions

View File

@ -29,10 +29,9 @@ def parse(data, allow_custom=False, version=None):
allow_custom (bool): Whether to allow custom properties as well unknown
custom objects. Note that unknown custom objects cannot be parsed
into STIX objects, and will be returned as is. Default: False.
version (str): Only used for bundles. If the spec_version property is
missing, it is ambiguous what spec should be used to parse the
bundle. In this case, this version parameter gives the spec
version to use.
version (str): If present, it forces the parser to use the version
provided. Otherwise, the library will make the best effort based
on checking the "spec_version" property.
Returns:
An instantiated Python STIX object.
@ -59,30 +58,29 @@ def parse(data, allow_custom=False, version=None):
def dict_to_stix2(stix_dict, allow_custom=False, version=None):
"""convert dictionary to full python-stix2 object
Args:
stix_dict (dict): a python dictionary of a STIX object
that (presumably) is semantically correct to be parsed
into a full python-stix2 obj
allow_custom (bool): Whether to allow custom properties as well
unknown custom objects. Note that unknown custom objects cannot
be parsed into STIX objects, and will be returned as is.
Default: False.
version: Only used for bundles. If the spec_version property is
missing, it is ambiguous what spec should be used to parse the
bundle. In this case, this version parameter gives the spec
version to use.
Args:
stix_dict (dict): a python dictionary of a STIX object
that (presumably) is semantically correct to be parsed
into a full python-stix2 obj
allow_custom (bool): Whether to allow custom properties as well
unknown custom objects. Note that unknown custom objects cannot
be parsed into STIX objects, and will be returned as is.
Default: False.
version (str): If present, it forces the parser to use the version
provided. Otherwise, the library will make the best effort based
on checking the "spec_version" property.
Returns:
An instantiated Python STIX object
Returns:
An instantiated Python STIX object
Warnings:
'allow_custom=True' will allow for the return of any supplied STIX
dict(s) that cannot be found to map to any known STIX object types
(both STIX2 domain objects or defined custom STIX2 objects); NO
validation is done. This is done to allow the processing of
possibly unknown custom STIX objects (example scenario: I need to
query a third-party TAXII endpoint that could provide custom STIX
objects that I don't know about ahead of time)
Warnings:
'allow_custom=True' will allow for the return of any supplied STIX
dict(s) that cannot be found to map to any known STIX object types
(both STIX2 domain objects or defined custom STIX2 objects); NO
validation is done. This is done to allow the processing of
possibly unknown custom STIX objects (example scenario: I need to
query a third-party TAXII endpoint that could provide custom STIX
objects that I don't know about ahead of time)
"""
if 'type' not in stix_dict:
@ -130,11 +128,12 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
object being parsed. Use empty list if no valid refs are present.
allow_custom (bool): Whether to allow custom properties or not.
Default: False.
version (str): If the spec version is missing, the latest supported
stix2 version will be used to parse the observable object.
version (str): If present, it forces the parser to use the version
provided. Otherwise, the library will use the latest version.
Returns:
An instantiated Python STIX Cyber Observable object.
"""
obj = _get_dict(data)
# get deep copy since we are going modify the dict and might
@ -187,6 +186,7 @@ def _register_object(new_type, version=None):
new_type (class): A class to register in the Object map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
if version:
v = 'v' + version.replace('.', '')
@ -205,6 +205,7 @@ def _register_marking(new_marking, version=None):
new_marking (class): A class to register in the Marking map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
if version:
v = 'v' + version.replace('.', '')
@ -223,6 +224,7 @@ def _register_observable(new_observable, version=None):
new_observable (class): A class to register in the Observables map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
if version:
v = 'v' + version.replace('.', '')
@ -243,6 +245,7 @@ def _register_observable_extension(observable, new_extension, version=None):
Extensions map.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
"""
if version:
v = 'v' + version.replace('.', '')

View File

@ -21,7 +21,11 @@ def _add(store, stix_data, allow_custom=True, version=None):
Recursive function, breaks down STIX Bundles and lists.
Args:
store: A MemoryStore, MemorySink or MemorySource object.
stix_data (list OR dict OR STIX object): STIX objects to be added
allow_custom (bool): Whether to allow custom properties as well unknown
custom objects. Note that unknown custom objects cannot be parsed
into STIX objects, and will be returned as is. Default: False.
version (str): Which STIX2 version to lock the parser to. (e.g. "2.0",
"2.1"). If None, the library makes the best effort to figure
out the spec representation of the object.
@ -93,8 +97,10 @@ class _ObjectFamily(object):
self.latest_version = obj
def __str__(self):
return "<<{}; latest={}>>".format(self.all_versions,
self.latest_version.modified)
return "<<{}; latest={}>>".format(
self.all_versions,
self.latest_version.modified,
)
def __repr__(self):
return str(self)
@ -278,8 +284,8 @@ class MemorySource(DataSource):
all_filters = list(
itertools.chain(
_composite_filters or [],
self.filters
)
self.filters,
),
)
stix_obj = next(apply_common_filters([stix_obj], all_filters), None)
@ -317,12 +323,12 @@ class MemorySource(DataSource):
all_filters = list(
itertools.chain(
_composite_filters or [],
self.filters
)
self.filters,
),
)
results.extend(
apply_common_filters(stix_objs_to_filter, all_filters)
apply_common_filters(stix_objs_to_filter, all_filters),
)
return results

View File

@ -5,10 +5,10 @@ import pytest
from stix2 import Filter, MemorySource, MemoryStore, properties
from stix2.datastore import make_id
from stix2.utils import parse_into_datetime
from stix2.v20 import (
Bundle, Campaign, CustomObject, Identity, Indicator, Malware, Relationship,
)
from stix2.utils import parse_into_datetime
from .constants import (
CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID,

View File

@ -18,7 +18,7 @@ class MockTAXIICollectionEndpoint(Collection):
def __init__(self, url, collection_info):
super(MockTAXIICollectionEndpoint, self).__init__(
url, collection_info=collection_info
url, collection_info=collection_info,
)
self.objects = []
@ -80,7 +80,7 @@ def collection(stix_objs1):
"media_types": [
"application/vnd.oasis.stix+json; version=2.0",
],
}
},
)
mock.objects.extend(stix_objs1)
@ -99,7 +99,7 @@ def collection_no_rw_access(stix_objs1):
"media_types": [
"application/vnd.oasis.stix+json; version=2.0",
],
}
},
)
mock.objects.extend(stix_objs1)

View File

@ -2,7 +2,9 @@ import pytest
from stix2.datastore import CompositeDataSource, make_id
from stix2.datastore.filters import Filter
from stix2.datastore.memory import MemorySink, MemorySource
from stix2.datastore.memory import MemorySink, MemorySource, MemoryStore
from stix2.utils import parse_into_datetime
from stix2.v21.common import TLP_GREEN
def test_add_remove_composite_datasource():
@ -47,14 +49,14 @@ def test_composite_datasource_operations(stix_objs1, stix_objs2):
indicators = cds1.all_versions("indicator--00000000-0000-4000-8000-000000000001")
# In STIX_OBJS2 changed the 'modified' property to a later time...
assert len(indicators) == 2
assert len(indicators) == 3
cds1.add_data_sources([cds2])
indicator = cds1.get("indicator--00000000-0000-4000-8000-000000000001")
assert indicator["id"] == "indicator--00000000-0000-4000-8000-000000000001"
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
assert indicator["modified"] == parse_into_datetime("2017-01-31T13:49:53.935Z")
assert indicator["type"] == "indicator"
query1 = [
@ -71,20 +73,80 @@ def test_composite_datasource_operations(stix_objs1, stix_objs2):
# STIX_OBJS2 has indicator with later time, one with different id, one with
# original time in STIX_OBJS1
assert len(results) == 3
assert len(results) == 4
indicator = cds1.get("indicator--00000000-0000-4000-8000-000000000001")
assert indicator["id"] == "indicator--00000000-0000-4000-8000-000000000001"
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
assert indicator["modified"] == parse_into_datetime("2017-01-31T13:49:53.935Z")
assert indicator["type"] == "indicator"
# There is only one indicator with different ID. Since we use the same data
# when deduplicated, only two indicators (one with different modified).
results = cds1.all_versions("indicator--00000000-0000-4000-8000-000000000001")
assert len(results) == 2
assert len(results) == 3
# Since we have filters already associated with our CompositeSource providing
# nothing returns the same as cds1.query(query1) (the associated query is query2)
results = cds1.query([])
assert len(results) == 3
assert len(results) == 4
def test_source_markings():
msrc = MemorySource(TLP_GREEN)
assert msrc.get(TLP_GREEN.id) == TLP_GREEN
assert msrc.all_versions(TLP_GREEN.id) == [TLP_GREEN]
assert msrc.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN]
def test_sink_markings():
# just make sure there is no crash
msink = MemorySink(TLP_GREEN)
msink.add(TLP_GREEN)
def test_store_markings():
mstore = MemoryStore(TLP_GREEN)
assert mstore.get(TLP_GREEN.id) == TLP_GREEN
assert mstore.all_versions(TLP_GREEN.id) == [TLP_GREEN]
assert mstore.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN]
def test_source_mixed(indicator):
msrc = MemorySource([TLP_GREEN, indicator])
assert msrc.get(TLP_GREEN.id) == TLP_GREEN
assert msrc.all_versions(TLP_GREEN.id) == [TLP_GREEN]
assert msrc.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN]
assert msrc.get(indicator.id) == indicator
assert msrc.all_versions(indicator.id) == [indicator]
assert msrc.query(Filter("id", "=", indicator.id)) == [indicator]
all_objs = msrc.query()
assert TLP_GREEN in all_objs
assert indicator in all_objs
assert len(all_objs) == 2
def test_sink_mixed(indicator):
# just make sure there is no crash
msink = MemorySink([TLP_GREEN, indicator])
msink.add([TLP_GREEN, indicator])
def test_store_mixed(indicator):
mstore = MemoryStore([TLP_GREEN, indicator])
assert mstore.get(TLP_GREEN.id) == TLP_GREEN
assert mstore.all_versions(TLP_GREEN.id) == [TLP_GREEN]
assert mstore.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN]
assert mstore.get(indicator.id) == indicator
assert mstore.all_versions(indicator.id) == [indicator]
assert mstore.query(Filter("id", "=", indicator.id)) == [indicator]
all_objs = mstore.query()
assert TLP_GREEN in all_objs
assert indicator in all_objs
assert len(all_objs) == 2

View File

@ -5,6 +5,7 @@ import pytest
from stix2 import Filter, MemorySource, MemoryStore, properties
from stix2.datastore import make_id
from stix2.utils import parse_into_datetime
from stix2.v21 import (
Bundle, Campaign, CustomObject, Identity, Indicator, Malware, Relationship,
)
@ -178,7 +179,7 @@ def test_memory_store_all_versions(mem_store):
))
resp = mem_store.all_versions("indicator--00000000-0000-4000-8000-000000000001")
assert len(resp) == 1 # MemoryStore can only store 1 version of each object
assert len(resp) == 3
def test_memory_store_query(mem_store):
@ -190,25 +191,27 @@ def test_memory_store_query(mem_store):
def test_memory_store_query_single_filter(mem_store):
query = Filter('id', '=', 'indicator--00000000-0000-4000-8000-000000000001')
resp = mem_store.query(query)
assert len(resp) == 1
assert len(resp) == 2
def test_memory_store_query_empty_query(mem_store):
resp = mem_store.query()
# sort since returned in random order
resp = sorted(resp, key=lambda k: k['id'])
assert len(resp) == 2
resp = sorted(resp, key=lambda k: (k['id'], k['modified']))
assert len(resp) == 3
assert resp[0]['id'] == 'indicator--00000000-0000-4000-8000-000000000001'
assert resp[0]['modified'] == '2017-01-27T13:49:53.936Z'
assert resp[1]['id'] == 'indicator--00000000-0000-4000-8000-000000000002'
assert resp[1]['modified'] == '2017-01-27T13:49:53.935Z'
assert resp[0]['modified'] == parse_into_datetime('2017-01-27T13:49:53.935Z')
assert resp[1]['id'] == 'indicator--00000000-0000-4000-8000-000000000001'
assert resp[1]['modified'] == parse_into_datetime('2017-01-27T13:49:53.936Z')
assert resp[2]['id'] == 'indicator--00000000-0000-4000-8000-000000000002'
assert resp[2]['modified'] == parse_into_datetime('2017-01-27T13:49:53.935Z')
def test_memory_store_query_multiple_filters(mem_store):
mem_store.source.filters.add(Filter('type', '=', 'indicator'))
query = Filter('id', '=', 'indicator--00000000-0000-4000-8000-000000000001')
resp = mem_store.query(query)
assert len(resp) == 1
assert len(resp) == 2
def test_memory_store_save_load_file(mem_store, fs_mem_store):
@ -229,12 +232,8 @@ def test_memory_store_save_load_file(mem_store, fs_mem_store):
def test_memory_store_add_invalid_object(mem_store):
ind = ('indicator', IND1) # tuple isn't valid
with pytest.raises(TypeError) as excinfo:
with pytest.raises(TypeError):
mem_store.add(ind)
assert 'stix_data expected to be' in str(excinfo.value)
assert 'a python-stix2 object' in str(excinfo.value)
assert 'JSON formatted STIX' in str(excinfo.value)
assert 'JSON formatted STIX bundle' in str(excinfo.value)
def test_memory_store_object_with_custom_property(mem_store):

View File

@ -16,8 +16,10 @@ COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c
class MockTAXIICollectionEndpoint(Collection):
"""Mock for taxii2_client.TAXIIClient"""
def __init__(self, url, **kwargs):
super(MockTAXIICollectionEndpoint, self).__init__(url, **kwargs)
def __init__(self, url, collection_info):
super(MockTAXIICollectionEndpoint, self).__init__(
url, collection_info=collection_info,
)
self.objects = []
def add_objects(self, bundle):
@ -69,7 +71,7 @@ class MockTAXIICollectionEndpoint(Collection):
@pytest.fixture
def collection(stix_objs1):
mock = MockTAXIICollectionEndpoint(
COLLECTION_URL, **{
COLLECTION_URL, {
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
"title": "Writable Collection",
"description": "This collection is a dropbox for submitting indicators",
@ -78,7 +80,7 @@ def collection(stix_objs1):
"media_types": [
"application/vnd.oasis.stix+json; version=2.0",
],
}
},
)
mock.objects.extend(stix_objs1)
@ -88,7 +90,7 @@ def collection(stix_objs1):
@pytest.fixture
def collection_no_rw_access(stix_objs1):
mock = MockTAXIICollectionEndpoint(
COLLECTION_URL, **{
COLLECTION_URL, {
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
"title": "Not writeable or readable Collection",
"description": "This collection is a dropbox for submitting indicators",
@ -97,7 +99,7 @@ def collection_no_rw_access(stix_objs1):
"media_types": [
"application/vnd.oasis.stix+json; version=2.0",
],
}
},
)
mock.objects.extend(stix_objs1)

View File

@ -131,7 +131,7 @@ def test_environment_functions():
# Get both versions of the object
resp = env.all_versions(INDICATOR_ID)
assert len(resp) == 1 # should be 2, but MemoryStore only keeps 1 version of objects
assert len(resp) == 2
# Get just the most recent version of the object
resp = env.get(INDICATOR_ID)

View File

@ -12,7 +12,7 @@ EXPECTED_TLP_MARKING_DEFINITION = """{
"type": "marking-definition",
"spec_version": "2.1",
"id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
"created": "2017-01-20T00:00:00Z",
"created": "2017-01-20T00:00:00.000Z",
"definition_type": "tlp",
"definition": {
"tlp": "white"

View File

@ -1,6 +1,7 @@
"""STIX 2.1 Common Data Types and Properties."""
from collections import OrderedDict
import copy
from ..base import _STIXBase
from ..custom import _custom_marking_builder
@ -160,6 +161,13 @@ class MarkingDefinition(_STIXBase, _MarkingsMixin):
except KeyError:
raise ValueError("definition_type must be a valid marking type")
if marking_type == TLPMarking:
# TLP instances in the spec have millisecond precision unlike other markings
self._properties = copy.deepcopy(self._properties)
self._properties.update([
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
])
if not isinstance(kwargs['definition'], marking_type):
defn = _get_dict(kwargs['definition'])
kwargs['definition'] = marking_type(**defn)