Merge branch 'master' of github.com:oasis-open/cti-python-stix2

master
chrisr3d 2020-03-02 15:31:39 +01:00
commit 0f0bc42681
No known key found for this signature in database
GPG Key ID: 6BBED1B63A6D639F
8 changed files with 393 additions and 120 deletions

View File

@ -390,14 +390,12 @@ class _Observable(_STIXBase):
temp_deep_copy = copy.deepcopy(dict(kwargs[key]))
_recursive_stix_to_dict(temp_deep_copy)
streamlined_obj_vals.append(temp_deep_copy)
elif isinstance(kwargs[key], list) and isinstance(kwargs[key][0], _STIXBase):
for obj in kwargs[key]:
temp_deep_copy = copy.deepcopy(dict(obj))
_recursive_stix_to_dict(temp_deep_copy)
streamlined_obj_vals.append(temp_deep_copy)
elif isinstance(kwargs[key], list):
temp_deep_copy = copy.deepcopy(kwargs[key])
_recursive_stix_list_to_dict(temp_deep_copy)
streamlined_obj_vals.append(temp_deep_copy)
else:
streamlined_obj_vals.append(kwargs[key])
if streamlined_obj_vals:
data = canonicalize(streamlined_obj_vals, utf8=False)
@ -450,5 +448,20 @@ def _recursive_stix_to_dict(input_dict):
# There may stil be nested _STIXBase objects
_recursive_stix_to_dict(input_dict[key])
elif isinstance(input_dict[key], list):
_recursive_stix_list_to_dict(input_dict[key])
else:
return
pass
def _recursive_stix_list_to_dict(input_list):
for i in range(len(input_list)):
if isinstance(input_list[i], _STIXBase):
input_list[i] = dict(input_list[i])
elif isinstance(input_list[i], dict):
pass
elif isinstance(input_list[i], list):
_recursive_stix_list_to_dict(input_list[i])
else:
continue
_recursive_stix_to_dict(input_list[i])

View File

@ -75,6 +75,47 @@ def parse(data, allow_custom=False, interoperability=False, version=None):
return obj
def _detect_spec_version(stix_dict):
"""
Given a dict representing a STIX object, try to detect what spec version
it is likely to comply with.
:param stix_dict: A dict with some STIX content. Must at least have a
"type" property.
:return: A string in "vXX" format, where "XX" indicates the spec version,
e.g. "v20", "v21", etc.
"""
obj_type = stix_dict["type"]
if 'spec_version' in stix_dict:
# For STIX 2.0, applies to bundles only.
# For STIX 2.1+, applies to SCOs, SDOs, SROs, and markings only.
v = 'v' + stix_dict['spec_version'].replace('.', '')
elif "id" not in stix_dict:
# Only 2.0 SCOs don't have ID properties
v = "v20"
elif obj_type == 'bundle':
# Bundle without a spec_version property: must be 2.1. But to
# future-proof, use max version over all contained SCOs, with 2.1
# minimum.
v = max(
"v21",
max(
_detect_spec_version(obj) for obj in stix_dict["objects"]
),
)
elif obj_type in STIX2_OBJ_MAPS["v21"]["observables"]:
# Non-bundle object with an ID and without spec_version. Could be a
# 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO...
v = "v21"
else:
# Not a 2.1 SCO; must be a 2.0 object.
v = "v20"
return v
def dict_to_stix2(stix_dict, allow_custom=False, interoperability=False, version=None):
"""convert dictionary to full python-stix2 object
@ -110,21 +151,8 @@ def dict_to_stix2(stix_dict, allow_custom=False, interoperability=False, version
if version:
# If the version argument was passed, override other approaches.
v = 'v' + version.replace('.', '')
elif 'spec_version' in stix_dict:
# For STIX 2.0, applies to bundles only.
# For STIX 2.1+, applies to SDOs, SROs, and markings only.
v = 'v' + stix_dict['spec_version'].replace('.', '')
elif stix_dict['type'] == 'bundle':
# bundles without spec_version are ambiguous.
if any('spec_version' in x for x in stix_dict['objects']):
# Only on 2.1 we are allowed to have 'spec_version' in SDOs/SROs.
v = 'v21'
else:
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
else:
# The spec says that SDO/SROs without spec_version will default to a
# '2.0' representation.
v = 'v20'
v = _detect_spec_version(stix_dict)
OBJ_MAP = dict(STIX2_OBJ_MAPS[v]['objects'], **STIX2_OBJ_MAPS[v]['observables'])
@ -159,6 +187,10 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
"""
obj = _get_dict(data)
if 'type' not in obj:
raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj))
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
@ -170,11 +202,8 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
# If the version argument was passed, override other approaches.
v = 'v' + version.replace('.', '')
else:
# Use default version (latest) if no version was provided.
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
v = _detect_spec_version(obj)
if 'type' not in obj:
raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj))
try:
OBJ_MAP_OBSERVABLE = STIX2_OBJ_MAPS[v]['observables']
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]

View File

@ -10,7 +10,6 @@ from stix2.base import _STIXBase
from stix2.core import parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore.filters import FilterSet, apply_common_filters
from stix2.utils import is_marking
def _add(store, stix_data, allow_custom=True, version=None):
@ -47,12 +46,10 @@ def _add(store, stix_data, allow_custom=True, version=None):
else:
stix_obj = parse(stix_data, allow_custom, version)
# Map ID directly to the object, if it is a marking. Otherwise,
# map to a family, so we can track multiple versions.
if is_marking(stix_obj):
store._data[stix_obj["id"]] = stix_obj
else:
# Map ID to a _ObjectFamily if the object is versioned, so we can track
# multiple versions. Otherwise, map directly to the object. All
# versioned objects should have a "modified" property.
if "modified" in stix_obj:
if stix_obj["id"] in store._data:
obj_family = store._data[stix_obj["id"]]
else:
@ -61,6 +58,9 @@ def _add(store, stix_data, allow_custom=True, version=None):
obj_family.add(stix_obj)
else:
store._data[stix_obj["id"]] = stix_obj
class _ObjectFamily(object):
"""
@ -267,12 +267,12 @@ class MemorySource(DataSource):
"""
stix_obj = None
if is_marking(stix_id):
stix_obj = self._data.get(stix_id)
else:
object_family = self._data.get(stix_id)
if object_family:
stix_obj = object_family.latest_version
mapped_value = self._data.get(stix_id)
if mapped_value:
if isinstance(mapped_value, _ObjectFamily):
stix_obj = mapped_value.latest_version
else:
stix_obj = mapped_value
if stix_obj:
all_filters = list(
@ -300,17 +300,13 @@ class MemorySource(DataSource):
"""
results = []
stix_objs_to_filter = None
if is_marking(stix_id):
stix_obj = self._data.get(stix_id)
if stix_obj:
stix_objs_to_filter = [stix_obj]
else:
object_family = self._data.get(stix_id)
if object_family:
stix_objs_to_filter = object_family.all_versions.values()
mapped_value = self._data.get(stix_id)
if mapped_value:
if isinstance(mapped_value, _ObjectFamily):
stix_objs_to_filter = mapped_value.all_versions.values()
else:
stix_objs_to_filter = [mapped_value]
if stix_objs_to_filter:
all_filters = list(
itertools.chain(
_composite_filters or [],

View File

@ -0,0 +1,212 @@
from __future__ import unicode_literals
import pytest
from stix2.core import _detect_spec_version
@pytest.mark.parametrize(
"obj_dict, expected_ver", [
# STIX 2.0 examples
(
{
"type": "identity",
"id": "identity--d7f72e8d-657a-43ec-9324-b3ec67a97486",
"created": "1972-05-21T05:33:09.000Z",
"modified": "1973-05-28T02:10:54.000Z",
"name": "alice",
"identity_class": "individual",
},
"v20",
),
(
{
"type": "relationship",
"id": "relationship--63b0f1b7-925e-4795-ac9b-61fb9f235f1a",
"created": "1981-08-11T13:48:19.000Z",
"modified": "2000-02-16T15:33:15.000Z",
"source_ref": "attack-pattern--9391504a-ef29-4a41-a257-5634d9edc391",
"target_ref": "identity--ba18dde2-56d3-4a34-aa0b-fc56f5be568f",
"relationship_type": "targets",
},
"v20",
),
(
{
"type": "file",
"name": "notes.txt",
},
"v20",
),
(
{
"type": "marking-definition",
"id": "marking-definition--2a13090f-a493-4b70-85fe-fa021d91dcd2",
"created": "1998-03-27T19:44:53.000Z",
"definition_type": "statement",
"definition": {
"statement": "Copyright (c) ACME Corp.",
},
},
"v20",
),
(
{
"type": "bundle",
"id": "bundle--8379cb02-8131-47c8-8a7c-9a1f0e0986b1",
"spec_version": "2.0",
"objects": [
{
"type": "identity",
"id": "identity--d7f72e8d-657a-43ec-9324-b3ec67a97486",
"created": "1972-05-21T05:33:09.000Z",
"modified": "1973-05-28T02:10:54.000Z",
"name": "alice",
"identity_class": "individual",
},
{
"type": "marking-definition",
"id": "marking-definition--2a13090f-a493-4b70-85fe-fa021d91dcd2",
"created": "1998-03-27T19:44:53.000Z",
"definition_type": "statement",
"definition": {
"statement": "Copyright (c) ACME Corp.",
},
},
],
},
"v20",
),
# STIX 2.1 examples
(
{
"type": "identity",
"id": "identity--22299b4c-bc38-4485-ad7d-8222f01c58c7",
"spec_version": "2.1",
"created": "1995-07-24T04:07:48.000Z",
"modified": "2001-07-01T09:33:17.000Z",
"name": "alice",
},
"v21",
),
(
{
"type": "relationship",
"id": "relationship--0eec232d-e1ea-4f85-8e78-0de6ae9d09f0",
"spec_version": "2.1",
"created": "1975-04-05T10:47:22.000Z",
"modified": "1983-04-25T20:56:00.000Z",
"source_ref": "attack-pattern--9391504a-ef29-4a41-a257-5634d9edc391",
"target_ref": "identity--ba18dde2-56d3-4a34-aa0b-fc56f5be568f",
"relationship_type": "targets",
},
"v21",
),
(
{
"type": "file",
"id": "file--5eef3404-6a94-4db3-9a1a-5684cbea0dfe",
"spec_version": "2.1",
"name": "notes.txt",
},
"v21",
),
(
{
"type": "file",
"id": "file--5eef3404-6a94-4db3-9a1a-5684cbea0dfe",
"name": "notes.txt",
},
"v21",
),
(
{
"type": "marking-definition",
"spec_version": "2.1",
"id": "marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da",
"created": "2017-01-20T00:00:00.000Z",
"definition_type": "tlp",
"name": "TLP:GREEN",
"definition": {
"tlp": "green",
},
},
"v21",
),
(
{
"type": "bundle",
"id": "bundle--d5787acd-1ffd-4630-ada3-6857698f6287",
"objects": [
{
"type": "identity",
"id": "identity--22299b4c-bc38-4485-ad7d-8222f01c58c7",
"spec_version": "2.1",
"created": "1995-07-24T04:07:48.000Z",
"modified": "2001-07-01T09:33:17.000Z",
"name": "alice",
},
{
"type": "file",
"id": "file--5eef3404-6a94-4db3-9a1a-5684cbea0dfe",
"name": "notes.txt",
},
],
},
"v21",
),
# Mixed spec examples
(
{
"type": "bundle",
"id": "bundle--e1a01e29-3432-401a-ab9f-c1082b056605",
"objects": [
{
"type": "identity",
"id": "identity--d7f72e8d-657a-43ec-9324-b3ec67a97486",
"created": "1972-05-21T05:33:09.000Z",
"modified": "1973-05-28T02:10:54.000Z",
"name": "alice",
"identity_class": "individual",
},
{
"type": "relationship",
"id": "relationship--63b0f1b7-925e-4795-ac9b-61fb9f235f1a",
"created": "1981-08-11T13:48:19.000Z",
"modified": "2000-02-16T15:33:15.000Z",
"source_ref": "attack-pattern--9391504a-ef29-4a41-a257-5634d9edc391",
"target_ref": "identity--ba18dde2-56d3-4a34-aa0b-fc56f5be568f",
"relationship_type": "targets",
},
],
},
"v21",
),
(
{
"type": "bundle",
"id": "bundle--eecad3d9-bb9a-4263-93f6-1c0ccc984574",
"objects": [
{
"type": "identity",
"id": "identity--d7f72e8d-657a-43ec-9324-b3ec67a97486",
"created": "1972-05-21T05:33:09.000Z",
"modified": "1973-05-28T02:10:54.000Z",
"name": "alice",
"identity_class": "individual",
},
{
"type": "file",
"id": "file--5eef3404-6a94-4db3-9a1a-5684cbea0dfe",
"name": "notes.txt",
},
],
},
"v21",
),
],
)
def test_spec_version_detect(obj_dict, expected_ver):
detected_ver = _detect_spec_version(obj_dict)
assert detected_ver == expected_ver

View File

@ -423,3 +423,24 @@ def test_object_family_internal_components(mem_source):
assert "latest=2017-01-27 13:49:53.936000+00:00>>" in str_representation
assert "latest=2017-01-27 13:49:53.936000+00:00>>" in repr_representation
def test_unversioned_objects(mem_store):
marking = {
"type": "marking-definition",
"id": "marking-definition--48e83cde-e902-4404-85b3-6e81f75ccb62",
"created": "1988-01-02T16:44:04.000Z",
"definition_type": "statement",
"definition": {
"statement": "Copyright (C) ACME Corp.",
},
}
mem_store.add(marking)
obj = mem_store.get(marking["id"])
assert obj["id"] == marking["id"]
objs = mem_store.all_versions(marking["id"])
assert len(objs) == 1
assert objs[0]["id"] == marking["id"]

View File

@ -438,3 +438,38 @@ def test_object_family_internal_components(mem_source):
assert "latest=2017-01-27 13:49:53.936000+00:00>>" in str_representation
assert "latest=2017-01-27 13:49:53.936000+00:00>>" in repr_representation
def test_unversioned_objects(mem_store):
marking = {
"type": "marking-definition",
"spec_version": "2.1",
"id": "marking-definition--48e83cde-e902-4404-85b3-6e81f75ccb62",
"created": "1988-01-02T16:44:04.000Z",
"definition_type": "statement",
"definition": {
"statement": "Copyright (C) ACME Corp.",
},
}
file_sco = {
"type": "file",
"id": "file--bbd59c0c-1aa4-44f1-96de-80b8325372c7",
"name": "cats.png",
}
mem_store.add([marking, file_sco])
obj = mem_store.get(marking["id"])
assert obj["id"] == marking["id"]
obj = mem_store.get(file_sco["id"])
assert obj["id"] == file_sco["id"]
objs = mem_store.all_versions(marking["id"])
assert len(objs) == 1
assert objs[0]["id"] == marking["id"]
objs = mem_store.all_versions(file_sco["id"])
assert len(objs) == 1
assert objs[0]["id"] == file_sco["id"]

View File

@ -1540,37 +1540,45 @@ def test_deterministic_id_no_contributing_props():
assert uuid_obj_2.version == 4
def test_ipv4_resolves_to_refs_deprecation():
with pytest.warns(stix2.exceptions.STIXDeprecationWarning):
def test_id_gen_recursive_dict_conversion_1():
file_observable = stix2.v21.File(
name="example.exe",
size=68 * 1000,
magic_number_hex="50000000",
hashes={
"SHA-256": "841a8921140aba50671ebb0770fecc4ee308c4952cfeff8de154ab14eeef4649",
},
extensions={
"windows-pebinary-ext": stix2.v21.WindowsPEBinaryExt(
pe_type="exe",
machine_hex="014c",
sections=[
stix2.v21.WindowsPESection(
name=".data",
size=4096,
entropy=7.980693,
hashes={"SHA-256": "6e3b6f3978e5cd96ba7abee35c24e867b7e64072e2ecb22d0ee7a6e6af6894d0"},
),
],
),
},
)
stix2.v21.IPv4Address(
value="26.09.19.70",
resolves_to_refs=["mac-addr--08900593-0265-52fc-93c0-5b4a942f5887"],
)
assert file_observable.id == "file--5219d93d-13c1-5f1f-896b-039f10ec67ea"
def test_ipv4_belongs_to_refs_deprecation():
with pytest.warns(stix2.exceptions.STIXDeprecationWarning):
def test_id_gen_recursive_dict_conversion_2():
wrko = stix2.v21.WindowsRegistryKey(
values=[
stix2.v21.WindowsRegistryValueType(
name="Foo",
data="qwerty",
),
stix2.v21.WindowsRegistryValueType(
name="Bar",
data="42",
),
],
)
stix2.v21.IPv4Address(
value="21.12.19.64",
belongs_to_refs=["autonomous-system--52e0a49d-d683-5801-a7b8-145765a1e116"],
)
def test_ipv6_resolves_to_refs_deprecation():
with pytest.warns(stix2.exceptions.STIXDeprecationWarning):
stix2.v21.IPv6Address(
value="2001:0db8:85a3:0000:0000:8a2e:0370:7334",
resolves_to_refs=["mac-addr--08900593-0265-52fc-93c0-5b4a942f5887"],
)
def test_ipv6_belongs_to_refs_deprecation():
with pytest.warns(stix2.exceptions.STIXDeprecationWarning):
stix2.v21.IPv6Address(
value="2001:0db8:85a3:0000:0000:8a2e:0370:7334",
belongs_to_refs=["autonomous-system--52e0a49d-d683-5801-a7b8-145765a1e116"],
)
assert wrko.id == "windows-registry-key--c087d9fe-a03e-5922-a1cd-da116e5b8a7b"

View File

@ -7,13 +7,10 @@ Observable and do not have a ``_type`` attribute.
from collections import OrderedDict
import itertools
import warnings
from ..base import _Extension, _Observable, _STIXBase
from ..custom import _custom_extension_builder, _custom_observable_builder
from ..exceptions import (
AtLeastOnePropertyError, DependentPropertiesError, STIXDeprecationWarning,
)
from ..exceptions import AtLeastOnePropertyError, DependentPropertiesError
from ..properties import (
BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, FloatProperty,
@ -122,14 +119,6 @@ class DomainName(_Observable):
])
_id_contributing_properties = ["value"]
def _check_object_constraints(self):
if self.get('resolves_to_refs'):
warnings.warn(
"The 'resolves_to_refs' property of domain-name is deprecated in "
"STIX 2.1. Use the 'resolves-to' relationship type instead",
STIXDeprecationWarning,
)
class EmailAddress(_Observable):
# TODO: Add link
@ -421,21 +410,6 @@ class IPv4Address(_Observable):
])
_id_contributing_properties = ["value"]
def _check_object_constraints(self):
if self.get('resolves_to_refs'):
warnings.warn(
"The 'resolves_to_refs' property of ipv4-addr is deprecated in "
"STIX 2.1. Use the 'resolves-to' relationship type instead",
STIXDeprecationWarning,
)
if self.get('belongs_to_refs'):
warnings.warn(
"The 'belongs_to_refs' property of ipv4-addr is deprecated in "
"STIX 2.1. Use the 'belongs-to' relationship type instead",
STIXDeprecationWarning,
)
class IPv6Address(_Observable):
# TODO: Add link
@ -458,21 +432,6 @@ class IPv6Address(_Observable):
])
_id_contributing_properties = ["value"]
def _check_object_constraints(self):
if self.get('resolves_to_refs'):
warnings.warn(
"The 'resolves_to_refs' property of ipv6-addr is deprecated in "
"STIX 2.1. Use the 'resolves-to' relationship type instead",
STIXDeprecationWarning,
)
if self.get('belongs_to_refs'):
warnings.warn(
"The 'belongs_to_refs' property of ipv6-addr is deprecated in "
"STIX 2.1. Use the 'belongs-to' relationship type instead",
STIXDeprecationWarning,
)
class MACAddress(_Observable):
# TODO: Add link