Merge pull request #185 from LiamSennitt/master
fix custom STIX objects with nested dictionary error #184stix2.0
commit
858a9752df
|
@ -242,6 +242,21 @@ class _STIXBase(collections.Mapping):
|
|||
optional properties set to the default value defined in the spec.
|
||||
**kwargs: The arguments for a json.dumps() call.
|
||||
|
||||
Examples:
|
||||
>>> import stix2
|
||||
>>> identity = stix2.Identity(name='Example Corp.', identity_class='organization')
|
||||
>>> print(identity.serialize(sort_keys=True))
|
||||
{"created": "2018-06-08T19:03:54.066Z", ... "name": "Example Corp.", "type": "identity"}
|
||||
>>> print(identity.serialize(sort_keys=True, indent=4))
|
||||
{
|
||||
"created": "2018-06-08T19:03:54.066Z",
|
||||
"id": "identity--d7f3e25a-ba1c-447a-ab71-6434b092b05e",
|
||||
"identity_class": "organization",
|
||||
"modified": "2018-06-08T19:03:54.066Z",
|
||||
"name": "Example Corp.",
|
||||
"type": "identity"
|
||||
}
|
||||
|
||||
Returns:
|
||||
str: The serialized JSON object.
|
||||
|
||||
|
|
|
@ -860,3 +860,33 @@ def test_register_custom_object():
|
|||
def test_extension_property_location():
|
||||
assert 'extensions' in stix2.v20.observables.OBJ_MAP_OBSERVABLE['x-new-observable']._properties
|
||||
assert 'extensions' not in stix2.v20.observables.EXT_MAP['domain-name']['x-new-ext']._properties
|
||||
|
||||
|
||||
@pytest.mark.parametrize("data", [
|
||||
"""{
|
||||
"type": "x-example",
|
||||
"id": "x-example--336d8a9f-91f1-46c5-b142-6441bb9f8b8d",
|
||||
"created": "2018-06-12T16:20:58.059Z",
|
||||
"modified": "2018-06-12T16:20:58.059Z",
|
||||
"dictionary": {
|
||||
"key": {
|
||||
"key_a": "value",
|
||||
"key_b": "value"
|
||||
}
|
||||
}
|
||||
}""",
|
||||
])
|
||||
def test_custom_object_nested_dictionary(data):
|
||||
@stix2.sdo.CustomObject('x-example', [
|
||||
('dictionary', stix2.properties.DictionaryProperty()),
|
||||
])
|
||||
class Example(object):
|
||||
def __init__(self, **kwargs):
|
||||
pass
|
||||
|
||||
example = Example(id='x-example--336d8a9f-91f1-46c5-b142-6441bb9f8b8d',
|
||||
created='2018-06-12T16:20:58.059Z',
|
||||
modified='2018-06-12T16:20:58.059Z',
|
||||
dictionary={'key': {'key_b': 'value', 'key_a': 'value'}})
|
||||
|
||||
assert data == str(example)
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import datetime as dt
|
||||
from io import StringIO
|
||||
|
||||
|
@ -10,7 +12,7 @@ amsterdam = pytz.timezone('Europe/Amsterdam')
|
|||
eastern = pytz.timezone('US/Eastern')
|
||||
|
||||
|
||||
@pytest.mark.parametrize('dttm,timestamp', [
|
||||
@pytest.mark.parametrize('dttm, timestamp', [
|
||||
(dt.datetime(2017, 1, 1, tzinfo=pytz.utc), '2017-01-01T00:00:00Z'),
|
||||
(amsterdam.localize(dt.datetime(2017, 1, 1)), '2016-12-31T23:00:00Z'),
|
||||
(eastern.localize(dt.datetime(2017, 1, 1, 12, 34, 56)), '2017-01-01T17:34:56Z'),
|
||||
|
@ -76,12 +78,12 @@ def test_get_dict_invalid(data):
|
|||
stix2.utils._get_dict(data)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('stix_id, typ', [
|
||||
@pytest.mark.parametrize('stix_id, type', [
|
||||
('malware--d69c8146-ab35-4d50-8382-6fc80e641d43', 'malware'),
|
||||
('intrusion-set--899ce53f-13a0-479b-a0e4-67d46e241542', 'intrusion-set')
|
||||
])
|
||||
def test_get_type_from_id(stix_id, typ):
|
||||
assert stix2.utils.get_type_from_id(stix_id) == typ
|
||||
def test_get_type_from_id(stix_id, type):
|
||||
assert stix2.utils.get_type_from_id(stix_id) == type
|
||||
|
||||
|
||||
def test_deduplicate(stix_objs1):
|
||||
|
@ -100,3 +102,110 @@ def test_deduplicate(stix_objs1):
|
|||
assert "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" in ids
|
||||
assert "2017-01-27T13:49:53.935Z" in mods
|
||||
assert "2017-01-27T13:49:53.936Z" in mods
|
||||
|
||||
|
||||
@pytest.mark.parametrize('object, tuple_to_find, expected_index', [
|
||||
(stix2.ObservedData(
|
||||
id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||
created="2016-04-06T19:58:16.000Z",
|
||||
modified="2016-04-06T19:58:16.000Z",
|
||||
first_observed="2015-12-21T19:00:00Z",
|
||||
last_observed="2015-12-21T19:00:00Z",
|
||||
number_observed=50,
|
||||
objects={
|
||||
"0": {
|
||||
"name": "foo.exe",
|
||||
"type": "file"
|
||||
},
|
||||
"1": {
|
||||
"type": "ipv4-addr",
|
||||
"value": "198.51.100.3"
|
||||
},
|
||||
"2": {
|
||||
"type": "network-traffic",
|
||||
"src_ref": "1",
|
||||
"protocols": [
|
||||
"tcp",
|
||||
"http"
|
||||
],
|
||||
"extensions": {
|
||||
"http-request-ext": {
|
||||
"request_method": "get",
|
||||
"request_value": "/download.html",
|
||||
"request_version": "http/1.1",
|
||||
"request_header": {
|
||||
"Accept-Encoding": "gzip,deflate",
|
||||
"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113",
|
||||
"Host": "www.example.com"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
), ('1', {"type": "ipv4-addr", "value": "198.51.100.3"}), 1),
|
||||
({
|
||||
"type": "x-example",
|
||||
"id": "x-example--d5413db2-c26c-42e0-b0e0-ec800a310bfb",
|
||||
"created": "2018-06-11T01:25:22.063Z",
|
||||
"modified": "2018-06-11T01:25:22.063Z",
|
||||
"dictionary": {
|
||||
"key": {
|
||||
"key_one": "value",
|
||||
"key_two": "value"
|
||||
}
|
||||
}
|
||||
}, ('key', {'key_one': 'value', 'key_two': 'value'}), 0),
|
||||
({
|
||||
"type": "language-content",
|
||||
"id": "language-content--b86bd89f-98bb-4fa9-8cb2-9ad421da981d",
|
||||
"created": "2017-02-08T21:31:22.007Z",
|
||||
"modified": "2017-02-08T21:31:22.007Z",
|
||||
"object_ref": "campaign--12a111f0-b824-4baf-a224-83b80237a094",
|
||||
"object_modified": "2017-02-08T21:31:22.007Z",
|
||||
"contents": {
|
||||
"de": {
|
||||
"name": "Bank Angriff 1",
|
||||
"description": "Weitere Informationen über Banküberfall"
|
||||
},
|
||||
"fr": {
|
||||
"name": "Attaque Bank 1",
|
||||
"description": "Plus d'informations sur la crise bancaire"
|
||||
}
|
||||
}
|
||||
}, ('fr', {"name": "Attaque Bank 1", "description": "Plus d'informations sur la crise bancaire"}), 1)
|
||||
])
|
||||
def test_find_property_index(object, tuple_to_find, expected_index):
|
||||
assert stix2.utils.find_property_index(
|
||||
object,
|
||||
[],
|
||||
tuple_to_find
|
||||
) == expected_index
|
||||
|
||||
|
||||
@pytest.mark.parametrize('dict_value, tuple_to_find, expected_index', [
|
||||
({
|
||||
"contents": {
|
||||
"de": {
|
||||
"name": "Bank Angriff 1",
|
||||
"description": "Weitere Informationen über Banküberfall"
|
||||
},
|
||||
"fr": {
|
||||
"name": "Attaque Bank 1",
|
||||
"description": "Plus d'informations sur la crise bancaire"
|
||||
},
|
||||
"es": {
|
||||
"name": "Ataque al Banco",
|
||||
"description": "Mas informacion sobre el ataque al banco"
|
||||
}
|
||||
}
|
||||
}, ('es', {"name": "Ataque al Banco", "description": "Mas informacion sobre el ataque al banco"}), 1), # Sorted alphabetically
|
||||
({
|
||||
'my_list': [
|
||||
{"key_one": 1},
|
||||
{"key_two": 2}
|
||||
]
|
||||
}, ('key_one', 1), 0)
|
||||
])
|
||||
def test_iterate_over_values(dict_value, tuple_to_find, expected_index):
|
||||
assert stix2.utils._iterate_over_values(dict_value.values(), tuple_to_find) == expected_index
|
||||
|
|
105
stix2/utils.py
105
stix2/utils.py
|
@ -165,44 +165,87 @@ def _get_dict(data):
|
|||
raise ValueError("Cannot convert '%s' to dictionary." % str(data))
|
||||
|
||||
|
||||
def _iterate_over_values(dict_values, tuple_to_find):
|
||||
"""Loop recursively over dictionary values"""
|
||||
from .base import _STIXBase
|
||||
for pv in dict_values:
|
||||
if isinstance(pv, list):
|
||||
for item in pv:
|
||||
if isinstance(item, _STIXBase):
|
||||
index = find_property_index(
|
||||
item,
|
||||
item.object_properties(),
|
||||
tuple_to_find
|
||||
)
|
||||
if index is not None:
|
||||
return index
|
||||
elif isinstance(item, dict):
|
||||
for idx, val in enumerate(sorted(item)):
|
||||
if (tuple_to_find[0] == val and
|
||||
item.get(val) == tuple_to_find[1]):
|
||||
return idx
|
||||
elif isinstance(pv, dict):
|
||||
if pv.get(tuple_to_find[0]) is not None:
|
||||
for idx, item in enumerate(sorted(pv.keys())):
|
||||
if ((item == tuple_to_find[0] and str.isdigit(item)) and
|
||||
(pv[item] == tuple_to_find[1])):
|
||||
return int(tuple_to_find[0])
|
||||
elif pv[item] == tuple_to_find[1]:
|
||||
return idx
|
||||
for item in pv.values():
|
||||
if isinstance(item, _STIXBase):
|
||||
index = find_property_index(
|
||||
item,
|
||||
item.object_properties(),
|
||||
tuple_to_find
|
||||
)
|
||||
if index is not None:
|
||||
return index
|
||||
elif isinstance(item, dict):
|
||||
index = find_property_index(
|
||||
item,
|
||||
item.keys(),
|
||||
tuple_to_find
|
||||
)
|
||||
if index is not None:
|
||||
return index
|
||||
|
||||
|
||||
def find_property_index(obj, properties, tuple_to_find):
|
||||
"""Recursively find the property in the object model, return the index
|
||||
according to the _properties OrderedDict. If it's a list look for
|
||||
individual objects. Returns and integer indicating its location
|
||||
according to the ``properties`` OrderedDict when working with `stix2`
|
||||
objects. If it's a list look for individual objects. Returns and integer
|
||||
indicating its location.
|
||||
|
||||
Notes:
|
||||
This method is intended to pretty print `stix2` properties for better
|
||||
visual feedback when working with the library.
|
||||
|
||||
Warnings:
|
||||
This method may not be able to produce the same output if called
|
||||
multiple times and makes a best effort attempt to print the properties
|
||||
according to the STIX technical specification.
|
||||
|
||||
See Also:
|
||||
py:meth:`stix2.base._STIXBase.serialize` for more information.
|
||||
|
||||
"""
|
||||
from .base import _STIXBase
|
||||
try:
|
||||
if tuple_to_find[1] in obj._inner.values():
|
||||
return properties.index(tuple_to_find[0])
|
||||
if isinstance(obj, _STIXBase):
|
||||
if tuple_to_find[1] in obj._inner.values():
|
||||
return properties.index(tuple_to_find[0])
|
||||
elif isinstance(obj, dict):
|
||||
for idx, val in enumerate(sorted(obj)):
|
||||
if (tuple_to_find[0] == val and
|
||||
obj.get(val) == tuple_to_find[1]):
|
||||
return idx
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
for pv in obj._inner.values():
|
||||
if isinstance(pv, list):
|
||||
for item in pv:
|
||||
if isinstance(item, _STIXBase):
|
||||
val = find_property_index(item,
|
||||
item.object_properties(),
|
||||
tuple_to_find)
|
||||
if val is not None:
|
||||
return val
|
||||
elif isinstance(item, dict):
|
||||
for idx, val in enumerate(sorted(item)):
|
||||
if (tuple_to_find[0] == val and
|
||||
item.get(val) == tuple_to_find[1]):
|
||||
return idx
|
||||
elif isinstance(pv, dict):
|
||||
if pv.get(tuple_to_find[0]) is not None:
|
||||
try:
|
||||
return int(tuple_to_find[0])
|
||||
except ValueError:
|
||||
return len(tuple_to_find[0])
|
||||
for item in pv.values():
|
||||
if isinstance(item, _STIXBase):
|
||||
val = find_property_index(item,
|
||||
item.object_properties(),
|
||||
tuple_to_find)
|
||||
if val is not None:
|
||||
return val
|
||||
if isinstance(obj, _STIXBase):
|
||||
return _iterate_over_values(obj._inner.values(), tuple_to_find)
|
||||
elif isinstance(obj, dict):
|
||||
return _iterate_over_values(obj.values(), tuple_to_find)
|
||||
|
||||
|
||||
def new_version(data, **kwargs):
|
||||
|
|
Loading…
Reference in New Issue