WIP- getting close though

stix2.0
= 2018-04-13 11:08:03 -04:00
parent 31fc1c369a
commit 1a1e5e1616
12 changed files with 246 additions and 204 deletions

View File

@ -50,7 +50,7 @@ from .patterns import (AndBooleanExpression, AndObservationExpression,
ReferenceObjectPathComponent, RepeatQualifier,
StartStopQualifier, StringConstant, TimestampConstant,
WithinQualifier)
from .utils import get_dict, new_version, revoke
from .utils import new_version, revoke
from .v20 import * # This import will always be the latest STIX 2.X version
from .version import __version__

View File

@ -9,7 +9,7 @@ import stix2
from . import exceptions
from .base import _STIXBase
from .properties import IDProperty, ListProperty, Property, TypeProperty
from .utils import get_class_hierarchy_names, get_dict
from .utils import _get_dict, get_class_hierarchy_names
class STIXObjectProperty(Property):
@ -25,7 +25,7 @@ class STIXObjectProperty(Property):
for x in get_class_hierarchy_names(value)):
return value
try:
dictified = get_dict(value)
dictified = _get_dict(value)
except ValueError:
raise ValueError("This property may only contain a dictionary or object")
if dictified == {}:
@ -95,7 +95,7 @@ def parse(data, allow_custom=False, version=None):
"""
# convert STIX object to dict, if not already
obj = get_dict(data)
obj = _get_dict(data)
# convert dict to full python-stix2 obj
obj = dict_to_stix2(obj, allow_custom, version)

View File

@ -16,7 +16,7 @@ import uuid
from six import with_metaclass
from stix2.datastore.filters import Filter, FilterSet, _assemble_filters
from stix2.datastore.filters import Filter, FilterSet
from stix2.utils import deduplicate
@ -379,10 +379,10 @@ class DataSource(with_metaclass(ABCMeta)):
ids.discard(obj_id)
# Assemble filters
filter_list = _assemble_filters(filters)
filter_list = FilterSet(filters)
for i in ids:
results.extend(self.query(filter_list + [Filter('id', '=', i)]))
results.extend(self.query([f for f in filter_list] + [Filter('id', '=', i)]))
return results

View File

@ -4,14 +4,15 @@ Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
"""
import collections
from datetime import datetime
from stix2.utils import STIXdatetime
from stix2.utils import format_datetime
"""Supported filter operations"""
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
"""Supported filter value types"""
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple, STIXdatetime]
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
try:
FILTER_VALUE_TYPES.append(unicode)
except NameError:
@ -19,23 +20,6 @@ except NameError:
pass
def deduplicate_filters(filters):
"""utility for deduplicating list of filters, this
is used when 'set()' cannot be used as one of the
filter values is a dict (or non-hashable type)
Args:
filters (list): a list of filters
Returns: list of unique filters
"""
unique_filters = []
for filter_ in filters:
if filter_ not in unique_filters:
unique_filters.append(filter_)
return unique_filters
def _check_filter_components(prop, op, value):
"""Check that filter meets minimum validity.
@ -63,37 +47,6 @@ def _check_filter_components(prop, op, value):
return True
def _assemble_filters(filters1=None, filters2=None):
"""Assemble a list of filters.
This can be used to allow certain functions to work correctly no matter if
the user provides a single filter or a list of them.
Args:
filters1 (Filter or list, optional): The single Filter or list of Filters to
coerce into a list of Filters.
filters2 (Filter or list, optional): The single Filter or list of Filters to
append to the list of Filters.
Returns:
List of Filters.
"""
if filters1 is None:
filter_list = []
elif not isinstance(filters1, list):
filter_list = [filters1]
else:
filter_list = filters1
if isinstance(filters2, list):
filter_list.extend(filters2)
elif filters2 is not None:
filter_list.append(filters2)
return filter_list
class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
"""STIX 2 filters that support the querying functionality of STIX 2
DataStores and DataSources.
@ -116,6 +69,11 @@ class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
if isinstance(value, list):
value = tuple(value)
if isinstance(value, datetime):
# if value is a datetime obj, convert to str
dt_str = format_datetime(value)
value = dt_str # use temp variable to avoid deepcopy operation
_check_filter_components(prop, op, value)
self = super(Filter, cls).__new__(cls, prop, op, value)
@ -131,6 +89,12 @@ class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
True if property matches the filter,
False otherwise.
"""
if isinstance(stix_obj_property, datetime):
# if a datetime obj, convert to str before comparison
# NOTE: this check seems like it should be done upstream
# but will put here for now
stix_obj_property = format_datetime(stix_obj_property)
if self.op == "=":
return stix_obj_property == self.value
elif self.op == "!=":
@ -252,7 +216,7 @@ class FilterSet(object):
# DataStore/Environment usage of filter operations
return
if not isinstance(filters, FilterSet) and not isinstance(filters, list):
if not isinstance(filters, (FilterSet, list)):
filters = [filters]
for f in filters:
@ -268,7 +232,7 @@ class FilterSet(object):
# DataStore/Environemnt usage of filter ops
return
if not isinstance(filters, FilterSet) and not isinstance(filters, list):
if not isinstance(filters, (FilterSet, list)):
filters = [filters]
for f in filters:

View File

@ -12,7 +12,7 @@ from stix2patterns.validator import run_validator
from .base import _STIXBase
from .exceptions import DictionaryKeyError
from .utils import get_dict, parse_into_datetime
from .utils import _get_dict, parse_into_datetime
class Property(object):
@ -232,7 +232,7 @@ class DictionaryProperty(Property):
def clean(self, value):
try:
dictified = get_dict(value)
dictified = _get_dict(value)
except ValueError:
raise ValueError("The dictionary property must contain a dictionary")
if dictified == {}:

View File

@ -80,6 +80,7 @@ def test_identity_custom_property_allowed():
def test_parse_identity_custom_property(data):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
identity = stix2.parse(data)
assert str(excinfo.value.cls) == str(stix2.Identity)
assert excinfo.value.cls == stix2.Identity
assert excinfo.value.properties == ['foo']
assert "Unexpected properties for" in str(excinfo.value)

View File

@ -5,7 +5,7 @@ from stix2 import Filter, MemorySink, MemorySource
from stix2.core import parse
from stix2.datastore import (CompositeDataSource, DataSink, DataSource,
make_id, taxii)
from stix2.datastore.filters import _assemble_filters, apply_common_filters
from stix2.datastore.filters import apply_common_filters
from stix2.utils import deduplicate, parse_into_datetime
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -21,6 +21,108 @@ def collection():
return Collection(COLLECTION_URL, MockTAXIIClient())
stix_objs = [
{
"created": "2017-01-27T13:49:53.997Z",
"description": "\n\nTITLE:\n\tPoison Ivy",
"id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"labels": [
"remote-access-trojan"
],
"modified": "2017-01-27T13:49:53.997Z",
"name": "Poison Ivy",
"type": "malware"
},
{
"created": "2014-05-08T09:00:00.000Z",
"id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"labels": [
"file-hash-watchlist"
],
"modified": "2014-05-08T09:00:00.000Z",
"name": "File hash for Poison Ivy variant",
"pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
"type": "indicator",
"valid_from": "2014-05-08T09:00:00.000000Z"
},
{
"created": "2014-05-08T09:00:00.000Z",
"granular_markings": [
{
"marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
"selectors": [
"relationship_type"
]
}
],
"id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463",
"modified": "2014-05-08T09:00:00.000Z",
"object_marking_refs": [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
],
"relationship_type": "indicates",
"revoked": True,
"source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"type": "relationship"
},
{
"id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef",
"created": "2016-02-14T00:00:00.000Z",
"created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9",
"modified": "2016-02-14T00:00:00.000Z",
"type": "vulnerability",
"name": "CVE-2014-0160",
"description": "The (1) TLS...",
"external_references": [
{
"source_name": "cve",
"external_id": "CVE-2014-0160"
}
],
"labels": ["heartbleed", "has-logo"]
},
{
"type": "observed-data",
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T19:58:16.000Z",
"modified": "2016-04-06T19:58:16.000Z",
"first_observed": "2015-12-21T19:00:00Z",
"last_observed": "2015-12-21T19:00:00Z",
"number_observed": 1,
"objects": {
"0": {
"type": "file",
"name": "HAL 9000.exe"
}
}
}
]
# same as above objects but converted to real Python STIX2 objects
# to test filters against true Python STIX2 objects
real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs]
filters = [
Filter("type", "!=", "relationship"),
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
Filter("labels", "in", "remote-access-trojan"),
Filter("created", ">", "2015-01-01T01:00:00.000Z"),
Filter("revoked", "=", True),
Filter("revoked", "!=", True),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "relationship_type"),
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"),
Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "description"),
Filter("external_references.source_name", "=", "CVE"),
Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}})
]
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
@ -240,111 +342,7 @@ def test_filter_type_underscore_check():
assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value)
def test_apply_common_filters():
stix_objs = [
{
"created": "2017-01-27T13:49:53.997Z",
"description": "\n\nTITLE:\n\tPoison Ivy",
"id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"labels": [
"remote-access-trojan"
],
"modified": "2017-01-27T13:49:53.997Z",
"name": "Poison Ivy",
"type": "malware"
},
{
"created": "2014-05-08T09:00:00.000Z",
"id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"labels": [
"file-hash-watchlist"
],
"modified": "2014-05-08T09:00:00.000Z",
"name": "File hash for Poison Ivy variant",
"pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
"type": "indicator",
"valid_from": "2014-05-08T09:00:00.000000Z"
},
{
"created": "2014-05-08T09:00:00.000Z",
"granular_markings": [
{
"marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
"selectors": [
"relationship_type"
]
}
],
"id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463",
"modified": "2014-05-08T09:00:00.000Z",
"object_marking_refs": [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
],
"relationship_type": "indicates",
"revoked": True,
"source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"type": "relationship"
},
{
"id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef",
"created": "2016-02-14T00:00:00.000Z",
"created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9",
"modified": "2016-02-14T00:00:00.000Z",
"type": "vulnerability",
"name": "CVE-2014-0160",
"description": "The (1) TLS...",
"external_references": [
{
"source_name": "cve",
"external_id": "CVE-2014-0160"
}
],
"labels": ["heartbleed", "has-logo"]
},
{
"type": "observed-data",
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T19:58:16.000Z",
"modified": "2016-04-06T19:58:16.000Z",
"first_observed": "2015-12-21T19:00:00Z",
"last_observed": "2015-12-21T19:00:00Z",
"number_observed": 1,
"objects": {
"0": {
"type": "file",
"name": "HAL 9000.exe"
}
}
}
]
# same as above objects but converted to real Python STIX2 objects
# to test filters against true Python STIX2 objects
print(stix_objs)
real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs]
print("after\n\n")
print(stix_objs)
filters = [
Filter("type", "!=", "relationship"),
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
Filter("labels", "in", "remote-access-trojan"),
Filter("created", ">", "2015-01-01T01:00:00.000Z"),
Filter("revoked", "=", True),
Filter("revoked", "!=", True),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "relationship_type"),
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"),
Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "description"),
Filter("external_references.source_name", "=", "CVE"),
Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}})
]
def test_apply_common_filters0():
# "Return any object whose type is not relationship"
resp = list(apply_common_filters(stix_objs, [filters[0]]))
ids = [r['id'] for r in resp]
@ -360,6 +358,8 @@ def test_apply_common_filters():
assert real_stix_objs[3].id in ids
assert len(ids) == 4
def test_apply_common_filters1():
# "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
resp = list(apply_common_filters(stix_objs, [filters[1]]))
assert resp[0]['id'] == stix_objs[2]['id']
@ -369,6 +369,8 @@ def test_apply_common_filters():
assert resp[0].id == real_stix_objs[2].id
assert len(resp) == 1
def test_apply_common_filters2():
# "Return any object that contains remote-access-trojan in labels"
resp = list(apply_common_filters(stix_objs, [filters[2]]))
assert resp[0]['id'] == stix_objs[0]['id']
@ -378,11 +380,19 @@ def test_apply_common_filters():
assert resp[0].id == real_stix_objs[0].id
assert len(resp) == 1
def test_apply_common_filters3():
# "Return any object created after 2015-01-01T01:00:00.000Z"
resp = list(apply_common_filters(stix_objs, [filters[3]]))
assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 3
resp = list(apply_common_filters(real_stix_objs, [filters[3]]))
assert resp[0].id == real_stix_objs[0].id
assert len(resp) == 3
def test_apply_common_filters4():
# "Return any revoked object"
resp = list(apply_common_filters(stix_objs, [filters[4]]))
assert resp[0]['id'] == stix_objs[2]['id']
@ -392,6 +402,8 @@ def test_apply_common_filters():
assert resp[0].id == real_stix_objs[2].id
assert len(resp) == 1
def test_apply_common_filters5():
# "Return any object whose not revoked"
# Note that if 'revoked' property is not present in object.
# Currently we can't use such an expression to filter for... :(
@ -401,6 +413,8 @@ def test_apply_common_filters():
resp = list(apply_common_filters(real_stix_objs, [filters[5]]))
assert len(resp) == 0
def test_apply_common_filters6():
# "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
resp = list(apply_common_filters(stix_objs, [filters[6]]))
assert resp[0]['id'] == stix_objs[2]['id']
@ -410,6 +424,8 @@ def test_apply_common_filters():
assert resp[0].id == real_stix_objs[2].id
assert len(resp) == 1
def test_apply_common_filters7():
# "Return any object that contains relationship_type in their selectors AND
# also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]]))
@ -420,6 +436,8 @@ def test_apply_common_filters():
assert resp[0].id == real_stix_objs[2].id
assert len(resp) == 1
def test_apply_common_filters8():
# "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
resp = list(apply_common_filters(stix_objs, [filters[9]]))
assert resp[0]['id'] == stix_objs[3]['id']
@ -429,6 +447,8 @@ def test_apply_common_filters():
assert resp[0].id == real_stix_objs[3].id
assert len(resp) == 1
def test_apply_common_filters9():
# "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9"
resp = list(apply_common_filters(stix_objs, [filters[10]]))
assert len(resp) == 1
@ -436,6 +456,8 @@ def test_apply_common_filters():
resp = list(apply_common_filters(real_stix_objs, [filters[10]]))
assert len(resp) == 1
def test_apply_common_filters10():
# "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None)
resp = list(apply_common_filters(stix_objs, [filters[11]]))
assert len(resp) == 0
@ -443,6 +465,8 @@ def test_apply_common_filters():
resp = list(apply_common_filters(real_stix_objs, [filters[11]]))
assert len(resp) == 0
def test_apply_common_filters11():
# "Return any object that contains description in its selectors" (None)
resp = list(apply_common_filters(stix_objs, [filters[12]]))
assert len(resp) == 0
@ -450,6 +474,8 @@ def test_apply_common_filters():
resp = list(apply_common_filters(real_stix_objs, [filters[12]]))
assert len(resp) == 0
def test_apply_common_filters12():
# "Return any object that matches CVE in source_name" (None, case sensitive)
resp = list(apply_common_filters(stix_objs, [filters[13]]))
assert len(resp) == 0
@ -457,19 +483,51 @@ def test_apply_common_filters():
resp = list(apply_common_filters(real_stix_objs, [filters[13]]))
assert len(resp) == 0
def test_apply_common_filters13():
# Return any object that matches file object in "objects"
# BUG: This test is brokem , weird behavior, the file obj
# in stix_objs is being parsed into real python-stix2 obj even though
# it never goes through parse() --> BAD <_<
resp = list(apply_common_filters(stix_objs, [filters[14]]))
assert resp[0]["id"] == stix_objs[14]["id"]
assert resp[0]["id"] == stix_objs[4]["id"]
assert len(resp) == 1
resp = list(apply_common_filters(real_stix_objs, [filters[14]]))
assert resp[0].id == real_stix_objs[14].id
assert resp[0].id == real_stix_objs[4].id
assert len(resp) == 1
def test_datetime_filter_behavior():
"""if a filter is initialized with its value being a datetime object
OR the STIX object property being filtered on is a datetime object, all
resulting comparisons executed are done on the string representations
of the datetime objects, as the Filter functionality will convert
all datetime objects to there string forms using format_datetim()
This test makes sure all datetime comparisons are carried out correctly
"""
filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond"))
filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z")
# compare datetime string to filter w/ datetime obj
resp = list(apply_common_filters(stix_objs, [filter_with_dt_obj]))
assert len(resp) == 1
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
# compare datetime obj to filter w/ datetime obj
resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj]))
assert len(resp) == 1
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
# compare datetime string to filter w/ str
resp = list(apply_common_filters(stix_objs, [filter_with_str]))
assert len(resp) == 1
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
# compare datetime obj to filter w/ str
resp = list(apply_common_filters(real_stix_objs, [filter_with_str]))
assert len(resp) == 1
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
def test_filters0():
# "Return any object modified before 2017-01-28T13:49:53.935Z"
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]))
@ -510,7 +568,9 @@ def test_filters3():
assert len(resp) == 2
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z"))]))
fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z"))
print(fv)
resp = list(apply_common_filters(REAL_STIX_OBJS2, [fv]))
assert resp[0].id == REAL_STIX_OBJS2[1].id
assert len(resp) == 2
@ -592,15 +652,6 @@ def test_filters7():
assert len(resp) == 1
def test_assemble_filters():
filter1 = Filter("name", "=", "Malicious site hosting downloader")
filter2 = Filter("modified", ">", "2017-01-28T13:49:53.935Z")
result = _assemble_filters(filter1, filter2)
assert len(result) == 2
assert result[0].property == 'name'
assert result[1].property == 'modified'
def test_deduplicate():
unique = deduplicate(STIX_OBJS1)

View File

@ -62,7 +62,7 @@ def test_parse_datetime_invalid(ts):
[("a", 1,)],
])
def test_get_dict(data):
assert stix2.utils.get_dict(data)
assert stix2.utils._get_dict(data)
@pytest.mark.parametrize('data', [
@ -73,7 +73,7 @@ def test_get_dict(data):
])
def test_get_dict_invalid(data):
with pytest.raises(ValueError):
stix2.utils.get_dict(data)
stix2.utils._get_dict(data)
@pytest.mark.parametrize('stix_id, typ', [

View File

@ -140,7 +140,7 @@ def parse_into_datetime(value, precision=None):
return STIXdatetime(ts, precision=precision)
def get_dict(data):
def _get_dict(data):
"""Return data as a dictionary.
Input can be a dictionary, string, or file-like object.

View File

@ -7,7 +7,7 @@ from ..markings import _MarkingsMixin
from ..properties import (HashesProperty, IDProperty, ListProperty, Property,
ReferenceProperty, SelectorProperty, StringProperty,
TimestampProperty, TypeProperty)
from ..utils import NOW, get_dict
from ..utils import NOW, _get_dict
class ExternalReference(_STIXBase):
@ -125,7 +125,7 @@ class MarkingDefinition(_STIXBase, _MarkingsMixin):
raise ValueError("definition_type must be a valid marking type")
if not isinstance(kwargs['definition'], marking_type):
defn = get_dict(kwargs['definition'])
defn = _get_dict(kwargs['definition'])
kwargs['definition'] = marking_type(**defn)
super(MarkingDefinition, self).__init__(**kwargs)

View File

@ -6,6 +6,7 @@ Observable and do not have a ``_type`` attribute.
"""
from collections import OrderedDict
import copy
from ..base import _Extension, _Observable, _STIXBase
from ..exceptions import (AtLeastOnePropertyError, DependentPropertiesError,
@ -15,7 +16,7 @@ from ..properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
HashesProperty, HexProperty, IntegerProperty,
ListProperty, ObjectReferenceProperty, Property,
StringProperty, TimestampProperty, TypeProperty)
from ..utils import get_dict
from ..utils import _get_dict
class ObservableProperty(Property):
@ -24,7 +25,11 @@ class ObservableProperty(Property):
def clean(self, value):
try:
dictified = get_dict(value)
dictified = _get_dict(value)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
dictified = copy.deepcopy(dictified)
except ValueError:
raise ValueError("The observable property must contain a dictionary")
if dictified == {}:
@ -49,7 +54,11 @@ class ExtensionsProperty(DictionaryProperty):
def clean(self, value):
try:
dictified = get_dict(value)
dictified = _get_dict(value)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
dictified = copy.deepcopy(dictified)
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
if dictified == {}:
@ -915,7 +924,12 @@ def parse_observable(data, _valid_refs=None, allow_custom=False):
An instantiated Python STIX Cyber Observable object.
"""
obj = get_dict(data)
obj = _get_dict(data)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
obj = copy.deepcopy(obj)
obj['_valid_refs'] = _valid_refs or []
if 'type' not in obj:

View File

@ -48,7 +48,7 @@ from . import (AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, # n
WindowsPEOptionalHeaderType, WindowsPESection,
WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType,
WindowsServiceExt, X509Certificate, X509V3ExtenstionsType)
from .datastore.filters import _assemble_filters
from .datastore.filters import FilterSet
# Use an implicit MemoryStore
_environ = Environment(store=MemoryStore())
@ -156,7 +156,8 @@ def attack_patterns(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'attack-pattern')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'attack-pattern'))
return query(filter_list)
@ -168,7 +169,8 @@ def campaigns(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'campaign')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'campaign'))
return query(filter_list)
@ -180,7 +182,8 @@ def courses_of_action(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'course-of-action')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'course-of-action'))
return query(filter_list)
@ -192,7 +195,8 @@ def identities(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'identity')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'identity'))
return query(filter_list)
@ -204,7 +208,8 @@ def indicators(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'indicator')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'indicator'))
return query(filter_list)
@ -216,7 +221,8 @@ def intrusion_sets(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'intrusion-set')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'intrusion-set'))
return query(filter_list)
@ -228,7 +234,8 @@ def malware(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'malware')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'malware'))
return query(filter_list)
@ -240,7 +247,8 @@ def observed_data(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'observed-data')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'observed-data'))
return query(filter_list)
@ -252,7 +260,8 @@ def reports(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'report')])
filter_list = FilterSet(filters)
filter_list.add([Filter('type', '=', 'report')])
return query(filter_list)
@ -264,7 +273,8 @@ def threat_actors(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'threat-actor')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'threat-actor'))
return query(filter_list)
@ -276,7 +286,8 @@ def tools(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'tool')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'tool'))
return query(filter_list)
@ -288,5 +299,6 @@ def vulnerabilities(filters=None):
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'vulnerability')])
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'vulnerability'))
return query(filter_list)