diff --git a/stix2/core.py b/stix2/core.py index 5d5ae3b..9594e49 100644 --- a/stix2/core.py +++ b/stix2/core.py @@ -29,10 +29,9 @@ def parse(data, allow_custom=False, version=None): allow_custom (bool): Whether to allow custom properties as well unknown custom objects. Note that unknown custom objects cannot be parsed into STIX objects, and will be returned as is. Default: False. - version (str): Only used for bundles. If the spec_version property is - missing, it is ambiguous what spec should be used to parse the - bundle. In this case, this version parameter gives the spec - version to use. + version (str): If present, it forces the parser to use the version + provided. Otherwise, the library will make the best effort based + on checking the "spec_version" property. Returns: An instantiated Python STIX object. @@ -59,30 +58,29 @@ def parse(data, allow_custom=False, version=None): def dict_to_stix2(stix_dict, allow_custom=False, version=None): """convert dictionary to full python-stix2 object - Args: - stix_dict (dict): a python dictionary of a STIX object - that (presumably) is semantically correct to be parsed - into a full python-stix2 obj - allow_custom (bool): Whether to allow custom properties as well - unknown custom objects. Note that unknown custom objects cannot - be parsed into STIX objects, and will be returned as is. - Default: False. - version: Only used for bundles. If the spec_version property is - missing, it is ambiguous what spec should be used to parse the - bundle. In this case, this version parameter gives the spec - version to use. + Args: + stix_dict (dict): a python dictionary of a STIX object + that (presumably) is semantically correct to be parsed + into a full python-stix2 obj + allow_custom (bool): Whether to allow custom properties as well + unknown custom objects. Note that unknown custom objects cannot + be parsed into STIX objects, and will be returned as is. + Default: False. + version (str): If present, it forces the parser to use the version + provided. Otherwise, the library will make the best effort based + on checking the "spec_version" property. - Returns: - An instantiated Python STIX object + Returns: + An instantiated Python STIX object - Warnings: - 'allow_custom=True' will allow for the return of any supplied STIX - dict(s) that cannot be found to map to any known STIX object types - (both STIX2 domain objects or defined custom STIX2 objects); NO - validation is done. This is done to allow the processing of - possibly unknown custom STIX objects (example scenario: I need to - query a third-party TAXII endpoint that could provide custom STIX - objects that I don't know about ahead of time) + Warnings: + 'allow_custom=True' will allow for the return of any supplied STIX + dict(s) that cannot be found to map to any known STIX object types + (both STIX2 domain objects or defined custom STIX2 objects); NO + validation is done. This is done to allow the processing of + possibly unknown custom STIX objects (example scenario: I need to + query a third-party TAXII endpoint that could provide custom STIX + objects that I don't know about ahead of time) """ if 'type' not in stix_dict: @@ -130,11 +128,12 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): object being parsed. Use empty list if no valid refs are present. allow_custom (bool): Whether to allow custom properties or not. Default: False. - version (str): If the spec version is missing, the latest supported - stix2 version will be used to parse the observable object. + version (str): If present, it forces the parser to use the version + provided. Otherwise, the library will use the latest version. Returns: An instantiated Python STIX Cyber Observable object. + """ obj = _get_dict(data) # get deep copy since we are going modify the dict and might @@ -187,6 +186,7 @@ def _register_object(new_type, version=None): new_type (class): A class to register in the Object map. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. + """ if version: v = 'v' + version.replace('.', '') @@ -205,6 +205,7 @@ def _register_marking(new_marking, version=None): new_marking (class): A class to register in the Marking map. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. + """ if version: v = 'v' + version.replace('.', '') @@ -223,6 +224,7 @@ def _register_observable(new_observable, version=None): new_observable (class): A class to register in the Observables map. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. + """ if version: v = 'v' + version.replace('.', '') @@ -243,6 +245,7 @@ def _register_observable_extension(observable, new_extension, version=None): Extensions map. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. + """ if version: v = 'v' + version.replace('.', '') diff --git a/stix2/datastore/memory.py b/stix2/datastore/memory.py index 819c0bf..7d5fb1c 100644 --- a/stix2/datastore/memory.py +++ b/stix2/datastore/memory.py @@ -21,7 +21,11 @@ def _add(store, stix_data, allow_custom=True, version=None): Recursive function, breaks down STIX Bundles and lists. Args: + store: A MemoryStore, MemorySink or MemorySource object. stix_data (list OR dict OR STIX object): STIX objects to be added + allow_custom (bool): Whether to allow custom properties as well unknown + custom objects. Note that unknown custom objects cannot be parsed + into STIX objects, and will be returned as is. Default: False. version (str): Which STIX2 version to lock the parser to. (e.g. "2.0", "2.1"). If None, the library makes the best effort to figure out the spec representation of the object. @@ -93,8 +97,10 @@ class _ObjectFamily(object): self.latest_version = obj def __str__(self): - return "<<{}; latest={}>>".format(self.all_versions, - self.latest_version.modified) + return "<<{}; latest={}>>".format( + self.all_versions, + self.latest_version.modified, + ) def __repr__(self): return str(self) @@ -278,8 +284,8 @@ class MemorySource(DataSource): all_filters = list( itertools.chain( _composite_filters or [], - self.filters - ) + self.filters, + ), ) stix_obj = next(apply_common_filters([stix_obj], all_filters), None) @@ -317,12 +323,12 @@ class MemorySource(DataSource): all_filters = list( itertools.chain( _composite_filters or [], - self.filters - ) + self.filters, + ), ) results.extend( - apply_common_filters(stix_objs_to_filter, all_filters) + apply_common_filters(stix_objs_to_filter, all_filters), ) return results diff --git a/stix2/test/v20/test_datastore_memory.py b/stix2/test/v20/test_datastore_memory.py index ec27f3f..ae3f754 100644 --- a/stix2/test/v20/test_datastore_memory.py +++ b/stix2/test/v20/test_datastore_memory.py @@ -5,10 +5,10 @@ import pytest from stix2 import Filter, MemorySource, MemoryStore, properties from stix2.datastore import make_id +from stix2.utils import parse_into_datetime from stix2.v20 import ( Bundle, Campaign, CustomObject, Identity, Indicator, Malware, Relationship, ) -from stix2.utils import parse_into_datetime from .constants import ( CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID, diff --git a/stix2/test/v20/test_datastore_taxii.py b/stix2/test/v20/test_datastore_taxii.py index a9765cb..9ac3a62 100644 --- a/stix2/test/v20/test_datastore_taxii.py +++ b/stix2/test/v20/test_datastore_taxii.py @@ -18,7 +18,7 @@ class MockTAXIICollectionEndpoint(Collection): def __init__(self, url, collection_info): super(MockTAXIICollectionEndpoint, self).__init__( - url, collection_info=collection_info + url, collection_info=collection_info, ) self.objects = [] @@ -80,7 +80,7 @@ def collection(stix_objs1): "media_types": [ "application/vnd.oasis.stix+json; version=2.0", ], - } + }, ) mock.objects.extend(stix_objs1) @@ -99,7 +99,7 @@ def collection_no_rw_access(stix_objs1): "media_types": [ "application/vnd.oasis.stix+json; version=2.0", ], - } + }, ) mock.objects.extend(stix_objs1) diff --git a/stix2/test/v21/test_datastore_composite.py b/stix2/test/v21/test_datastore_composite.py index 39c7c48..76119c3 100644 --- a/stix2/test/v21/test_datastore_composite.py +++ b/stix2/test/v21/test_datastore_composite.py @@ -2,7 +2,9 @@ import pytest from stix2.datastore import CompositeDataSource, make_id from stix2.datastore.filters import Filter -from stix2.datastore.memory import MemorySink, MemorySource +from stix2.datastore.memory import MemorySink, MemorySource, MemoryStore +from stix2.utils import parse_into_datetime +from stix2.v21.common import TLP_GREEN def test_add_remove_composite_datasource(): @@ -47,14 +49,14 @@ def test_composite_datasource_operations(stix_objs1, stix_objs2): indicators = cds1.all_versions("indicator--00000000-0000-4000-8000-000000000001") # In STIX_OBJS2 changed the 'modified' property to a later time... - assert len(indicators) == 2 + assert len(indicators) == 3 cds1.add_data_sources([cds2]) indicator = cds1.get("indicator--00000000-0000-4000-8000-000000000001") assert indicator["id"] == "indicator--00000000-0000-4000-8000-000000000001" - assert indicator["modified"] == "2017-01-31T13:49:53.935Z" + assert indicator["modified"] == parse_into_datetime("2017-01-31T13:49:53.935Z") assert indicator["type"] == "indicator" query1 = [ @@ -71,20 +73,80 @@ def test_composite_datasource_operations(stix_objs1, stix_objs2): # STIX_OBJS2 has indicator with later time, one with different id, one with # original time in STIX_OBJS1 - assert len(results) == 3 + assert len(results) == 4 indicator = cds1.get("indicator--00000000-0000-4000-8000-000000000001") assert indicator["id"] == "indicator--00000000-0000-4000-8000-000000000001" - assert indicator["modified"] == "2017-01-31T13:49:53.935Z" + assert indicator["modified"] == parse_into_datetime("2017-01-31T13:49:53.935Z") assert indicator["type"] == "indicator" - # There is only one indicator with different ID. Since we use the same data - # when deduplicated, only two indicators (one with different modified). results = cds1.all_versions("indicator--00000000-0000-4000-8000-000000000001") - assert len(results) == 2 + assert len(results) == 3 # Since we have filters already associated with our CompositeSource providing # nothing returns the same as cds1.query(query1) (the associated query is query2) results = cds1.query([]) - assert len(results) == 3 + assert len(results) == 4 + + +def test_source_markings(): + msrc = MemorySource(TLP_GREEN) + + assert msrc.get(TLP_GREEN.id) == TLP_GREEN + assert msrc.all_versions(TLP_GREEN.id) == [TLP_GREEN] + assert msrc.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN] + + +def test_sink_markings(): + # just make sure there is no crash + msink = MemorySink(TLP_GREEN) + msink.add(TLP_GREEN) + + +def test_store_markings(): + mstore = MemoryStore(TLP_GREEN) + + assert mstore.get(TLP_GREEN.id) == TLP_GREEN + assert mstore.all_versions(TLP_GREEN.id) == [TLP_GREEN] + assert mstore.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN] + + +def test_source_mixed(indicator): + msrc = MemorySource([TLP_GREEN, indicator]) + + assert msrc.get(TLP_GREEN.id) == TLP_GREEN + assert msrc.all_versions(TLP_GREEN.id) == [TLP_GREEN] + assert msrc.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN] + + assert msrc.get(indicator.id) == indicator + assert msrc.all_versions(indicator.id) == [indicator] + assert msrc.query(Filter("id", "=", indicator.id)) == [indicator] + + all_objs = msrc.query() + assert TLP_GREEN in all_objs + assert indicator in all_objs + assert len(all_objs) == 2 + + +def test_sink_mixed(indicator): + # just make sure there is no crash + msink = MemorySink([TLP_GREEN, indicator]) + msink.add([TLP_GREEN, indicator]) + + +def test_store_mixed(indicator): + mstore = MemoryStore([TLP_GREEN, indicator]) + + assert mstore.get(TLP_GREEN.id) == TLP_GREEN + assert mstore.all_versions(TLP_GREEN.id) == [TLP_GREEN] + assert mstore.query(Filter("id", "=", TLP_GREEN.id)) == [TLP_GREEN] + + assert mstore.get(indicator.id) == indicator + assert mstore.all_versions(indicator.id) == [indicator] + assert mstore.query(Filter("id", "=", indicator.id)) == [indicator] + + all_objs = mstore.query() + assert TLP_GREEN in all_objs + assert indicator in all_objs + assert len(all_objs) == 2 diff --git a/stix2/test/v21/test_datastore_memory.py b/stix2/test/v21/test_datastore_memory.py index 2bc4730..d77c6b4 100644 --- a/stix2/test/v21/test_datastore_memory.py +++ b/stix2/test/v21/test_datastore_memory.py @@ -5,6 +5,7 @@ import pytest from stix2 import Filter, MemorySource, MemoryStore, properties from stix2.datastore import make_id +from stix2.utils import parse_into_datetime from stix2.v21 import ( Bundle, Campaign, CustomObject, Identity, Indicator, Malware, Relationship, ) @@ -178,7 +179,7 @@ def test_memory_store_all_versions(mem_store): )) resp = mem_store.all_versions("indicator--00000000-0000-4000-8000-000000000001") - assert len(resp) == 1 # MemoryStore can only store 1 version of each object + assert len(resp) == 3 def test_memory_store_query(mem_store): @@ -190,25 +191,27 @@ def test_memory_store_query(mem_store): def test_memory_store_query_single_filter(mem_store): query = Filter('id', '=', 'indicator--00000000-0000-4000-8000-000000000001') resp = mem_store.query(query) - assert len(resp) == 1 + assert len(resp) == 2 def test_memory_store_query_empty_query(mem_store): resp = mem_store.query() # sort since returned in random order - resp = sorted(resp, key=lambda k: k['id']) - assert len(resp) == 2 + resp = sorted(resp, key=lambda k: (k['id'], k['modified'])) + assert len(resp) == 3 assert resp[0]['id'] == 'indicator--00000000-0000-4000-8000-000000000001' - assert resp[0]['modified'] == '2017-01-27T13:49:53.936Z' - assert resp[1]['id'] == 'indicator--00000000-0000-4000-8000-000000000002' - assert resp[1]['modified'] == '2017-01-27T13:49:53.935Z' + assert resp[0]['modified'] == parse_into_datetime('2017-01-27T13:49:53.935Z') + assert resp[1]['id'] == 'indicator--00000000-0000-4000-8000-000000000001' + assert resp[1]['modified'] == parse_into_datetime('2017-01-27T13:49:53.936Z') + assert resp[2]['id'] == 'indicator--00000000-0000-4000-8000-000000000002' + assert resp[2]['modified'] == parse_into_datetime('2017-01-27T13:49:53.935Z') def test_memory_store_query_multiple_filters(mem_store): mem_store.source.filters.add(Filter('type', '=', 'indicator')) query = Filter('id', '=', 'indicator--00000000-0000-4000-8000-000000000001') resp = mem_store.query(query) - assert len(resp) == 1 + assert len(resp) == 2 def test_memory_store_save_load_file(mem_store, fs_mem_store): @@ -229,12 +232,8 @@ def test_memory_store_save_load_file(mem_store, fs_mem_store): def test_memory_store_add_invalid_object(mem_store): ind = ('indicator', IND1) # tuple isn't valid - with pytest.raises(TypeError) as excinfo: + with pytest.raises(TypeError): mem_store.add(ind) - assert 'stix_data expected to be' in str(excinfo.value) - assert 'a python-stix2 object' in str(excinfo.value) - assert 'JSON formatted STIX' in str(excinfo.value) - assert 'JSON formatted STIX bundle' in str(excinfo.value) def test_memory_store_object_with_custom_property(mem_store): diff --git a/stix2/test/v21/test_datastore_taxii.py b/stix2/test/v21/test_datastore_taxii.py index 0ba27b1..9c555ba 100644 --- a/stix2/test/v21/test_datastore_taxii.py +++ b/stix2/test/v21/test_datastore_taxii.py @@ -16,8 +16,10 @@ COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c class MockTAXIICollectionEndpoint(Collection): """Mock for taxii2_client.TAXIIClient""" - def __init__(self, url, **kwargs): - super(MockTAXIICollectionEndpoint, self).__init__(url, **kwargs) + def __init__(self, url, collection_info): + super(MockTAXIICollectionEndpoint, self).__init__( + url, collection_info=collection_info, + ) self.objects = [] def add_objects(self, bundle): @@ -69,7 +71,7 @@ class MockTAXIICollectionEndpoint(Collection): @pytest.fixture def collection(stix_objs1): mock = MockTAXIICollectionEndpoint( - COLLECTION_URL, **{ + COLLECTION_URL, { "id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116", "title": "Writable Collection", "description": "This collection is a dropbox for submitting indicators", @@ -78,7 +80,7 @@ def collection(stix_objs1): "media_types": [ "application/vnd.oasis.stix+json; version=2.0", ], - } + }, ) mock.objects.extend(stix_objs1) @@ -88,7 +90,7 @@ def collection(stix_objs1): @pytest.fixture def collection_no_rw_access(stix_objs1): mock = MockTAXIICollectionEndpoint( - COLLECTION_URL, **{ + COLLECTION_URL, { "id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116", "title": "Not writeable or readable Collection", "description": "This collection is a dropbox for submitting indicators", @@ -97,7 +99,7 @@ def collection_no_rw_access(stix_objs1): "media_types": [ "application/vnd.oasis.stix+json; version=2.0", ], - } + }, ) mock.objects.extend(stix_objs1) diff --git a/stix2/test/v21/test_environment.py b/stix2/test/v21/test_environment.py index 21f0d7c..e08971e 100644 --- a/stix2/test/v21/test_environment.py +++ b/stix2/test/v21/test_environment.py @@ -131,7 +131,7 @@ def test_environment_functions(): # Get both versions of the object resp = env.all_versions(INDICATOR_ID) - assert len(resp) == 1 # should be 2, but MemoryStore only keeps 1 version of objects + assert len(resp) == 2 # Get just the most recent version of the object resp = env.get(INDICATOR_ID) diff --git a/stix2/test/v21/test_markings.py b/stix2/test/v21/test_markings.py index 4fb6203..7782236 100644 --- a/stix2/test/v21/test_markings.py +++ b/stix2/test/v21/test_markings.py @@ -12,7 +12,7 @@ EXPECTED_TLP_MARKING_DEFINITION = """{ "type": "marking-definition", "spec_version": "2.1", "id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", - "created": "2017-01-20T00:00:00Z", + "created": "2017-01-20T00:00:00.000Z", "definition_type": "tlp", "definition": { "tlp": "white" diff --git a/stix2/v21/common.py b/stix2/v21/common.py index 3dd3222..87c7225 100644 --- a/stix2/v21/common.py +++ b/stix2/v21/common.py @@ -1,6 +1,7 @@ """STIX 2.1 Common Data Types and Properties.""" from collections import OrderedDict +import copy from ..base import _STIXBase from ..custom import _custom_marking_builder @@ -160,6 +161,13 @@ class MarkingDefinition(_STIXBase, _MarkingsMixin): except KeyError: raise ValueError("definition_type must be a valid marking type") + if marking_type == TLPMarking: + # TLP instances in the spec have millisecond precision unlike other markings + self._properties = copy.deepcopy(self._properties) + self._properties.update([ + ('created', TimestampProperty(default=lambda: NOW, precision='millisecond')), + ]) + if not isinstance(kwargs['definition'], marking_type): defn = _get_dict(kwargs['definition']) kwargs['definition'] = marking_type(**defn)