Merge remote-tracking branch 'origin/master' into stix2.1
commit
2e0dfc6592
|
@ -60,8 +60,7 @@ To parse a STIX JSON string into a Python STIX object, use
|
||||||
"malicious-activity"
|
"malicious-activity"
|
||||||
],
|
],
|
||||||
"name": "File hash for malware variant",
|
"name": "File hash for malware variant",
|
||||||
"pattern": "[file:hashes.md5 =
|
"pattern": "[file:hashes.md5 ='d41d8cd98f00b204e9800998ecf8427e']",
|
||||||
'd41d8cd98f00b204e9800998ecf8427e']",
|
|
||||||
"valid_from": "2017-09-26T23:33:39.829952Z"
|
"valid_from": "2017-09-26T23:33:39.829952Z"
|
||||||
}""")
|
}""")
|
||||||
print(indicator)
|
print(indicator)
|
||||||
|
|
|
@ -185,7 +185,13 @@ class _STIXBase(collections.Mapping):
|
||||||
|
|
||||||
# Handle attribute access just like key access
|
# Handle attribute access just like key access
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
if name in self:
|
# Pickle-proofing: pickle invokes this on uninitialized instances (i.e.
|
||||||
|
# __init__ has not run). So no "self" attributes are set yet. The
|
||||||
|
# usual behavior of this method reads an __init__-assigned attribute,
|
||||||
|
# which would cause infinite recursion. So this check disables all
|
||||||
|
# attribute reads until the instance has been properly initialized.
|
||||||
|
unpickling = "_inner" not in self.__dict__
|
||||||
|
if not unpickling and name in self:
|
||||||
return self.__getitem__(name)
|
return self.__getitem__(name)
|
||||||
raise AttributeError("'%s' object has no attribute '%s'" %
|
raise AttributeError("'%s' object has no attribute '%s'" %
|
||||||
(self.__class__.__name__, name))
|
(self.__class__.__name__, name))
|
||||||
|
@ -236,6 +242,21 @@ class _STIXBase(collections.Mapping):
|
||||||
optional properties set to the default value defined in the spec.
|
optional properties set to the default value defined in the spec.
|
||||||
**kwargs: The arguments for a json.dumps() call.
|
**kwargs: The arguments for a json.dumps() call.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
>>> import stix2
|
||||||
|
>>> identity = stix2.Identity(name='Example Corp.', identity_class='organization')
|
||||||
|
>>> print(identity.serialize(sort_keys=True))
|
||||||
|
{"created": "2018-06-08T19:03:54.066Z", ... "name": "Example Corp.", "type": "identity"}
|
||||||
|
>>> print(identity.serialize(sort_keys=True, indent=4))
|
||||||
|
{
|
||||||
|
"created": "2018-06-08T19:03:54.066Z",
|
||||||
|
"id": "identity--d7f3e25a-ba1c-447a-ab71-6434b092b05e",
|
||||||
|
"identity_class": "organization",
|
||||||
|
"modified": "2018-06-08T19:03:54.066Z",
|
||||||
|
"name": "Example Corp.",
|
||||||
|
"type": "identity"
|
||||||
|
}
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
str: The serialized JSON object.
|
str: The serialized JSON object.
|
||||||
|
|
||||||
|
|
|
@ -860,3 +860,33 @@ def test_register_custom_object():
|
||||||
def test_extension_property_location():
|
def test_extension_property_location():
|
||||||
assert 'extensions' in stix2.v21.observables.OBJ_MAP_OBSERVABLE['x-new-observable']._properties
|
assert 'extensions' in stix2.v21.observables.OBJ_MAP_OBSERVABLE['x-new-observable']._properties
|
||||||
assert 'extensions' not in stix2.v21.observables.EXT_MAP['domain-name']['x-new-ext']._properties
|
assert 'extensions' not in stix2.v21.observables.EXT_MAP['domain-name']['x-new-ext']._properties
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("data", [
|
||||||
|
"""{
|
||||||
|
"type": "x-example",
|
||||||
|
"id": "x-example--336d8a9f-91f1-46c5-b142-6441bb9f8b8d",
|
||||||
|
"created": "2018-06-12T16:20:58.059Z",
|
||||||
|
"modified": "2018-06-12T16:20:58.059Z",
|
||||||
|
"dictionary": {
|
||||||
|
"key": {
|
||||||
|
"key_a": "value",
|
||||||
|
"key_b": "value"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}""",
|
||||||
|
])
|
||||||
|
def test_custom_object_nested_dictionary(data):
|
||||||
|
@stix2.sdo.CustomObject('x-example', [
|
||||||
|
('dictionary', stix2.properties.DictionaryProperty()),
|
||||||
|
])
|
||||||
|
class Example(object):
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
example = Example(id='x-example--336d8a9f-91f1-46c5-b142-6441bb9f8b8d',
|
||||||
|
created='2018-06-12T16:20:58.059Z',
|
||||||
|
modified='2018-06-12T16:20:58.059Z',
|
||||||
|
dictionary={'key': {'key_b': 'value', 'key_a': 'value'}})
|
||||||
|
|
||||||
|
assert data == str(example)
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
import pickle
|
||||||
|
|
||||||
|
import stix2
|
||||||
|
|
||||||
|
|
||||||
|
def test_pickling():
|
||||||
|
"""
|
||||||
|
Ensure a pickle/unpickle cycle works okay.
|
||||||
|
"""
|
||||||
|
identity = stix2.Identity(
|
||||||
|
id="identity--d66cb89d-5228-4983-958c-fa84ef75c88c",
|
||||||
|
name="alice",
|
||||||
|
description="this is a pickle test",
|
||||||
|
identity_class="some_class"
|
||||||
|
)
|
||||||
|
|
||||||
|
pickle.loads(pickle.dumps(identity))
|
|
@ -1,3 +1,5 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
|
|
||||||
|
@ -10,7 +12,7 @@ amsterdam = pytz.timezone('Europe/Amsterdam')
|
||||||
eastern = pytz.timezone('US/Eastern')
|
eastern = pytz.timezone('US/Eastern')
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('dttm,timestamp', [
|
@pytest.mark.parametrize('dttm, timestamp', [
|
||||||
(dt.datetime(2017, 1, 1, tzinfo=pytz.utc), '2017-01-01T00:00:00Z'),
|
(dt.datetime(2017, 1, 1, tzinfo=pytz.utc), '2017-01-01T00:00:00Z'),
|
||||||
(amsterdam.localize(dt.datetime(2017, 1, 1)), '2016-12-31T23:00:00Z'),
|
(amsterdam.localize(dt.datetime(2017, 1, 1)), '2016-12-31T23:00:00Z'),
|
||||||
(eastern.localize(dt.datetime(2017, 1, 1, 12, 34, 56)), '2017-01-01T17:34:56Z'),
|
(eastern.localize(dt.datetime(2017, 1, 1, 12, 34, 56)), '2017-01-01T17:34:56Z'),
|
||||||
|
@ -76,12 +78,12 @@ def test_get_dict_invalid(data):
|
||||||
stix2.utils._get_dict(data)
|
stix2.utils._get_dict(data)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('stix_id, typ', [
|
@pytest.mark.parametrize('stix_id, type', [
|
||||||
('malware--d69c8146-ab35-4d50-8382-6fc80e641d43', 'malware'),
|
('malware--d69c8146-ab35-4d50-8382-6fc80e641d43', 'malware'),
|
||||||
('intrusion-set--899ce53f-13a0-479b-a0e4-67d46e241542', 'intrusion-set')
|
('intrusion-set--899ce53f-13a0-479b-a0e4-67d46e241542', 'intrusion-set')
|
||||||
])
|
])
|
||||||
def test_get_type_from_id(stix_id, typ):
|
def test_get_type_from_id(stix_id, type):
|
||||||
assert stix2.utils.get_type_from_id(stix_id) == typ
|
assert stix2.utils.get_type_from_id(stix_id) == type
|
||||||
|
|
||||||
|
|
||||||
def test_deduplicate(stix_objs1):
|
def test_deduplicate(stix_objs1):
|
||||||
|
@ -100,3 +102,110 @@ def test_deduplicate(stix_objs1):
|
||||||
assert "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" in ids
|
assert "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" in ids
|
||||||
assert "2017-01-27T13:49:53.935Z" in mods
|
assert "2017-01-27T13:49:53.935Z" in mods
|
||||||
assert "2017-01-27T13:49:53.936Z" in mods
|
assert "2017-01-27T13:49:53.936Z" in mods
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('object, tuple_to_find, expected_index', [
|
||||||
|
(stix2.ObservedData(
|
||||||
|
id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||||
|
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||||
|
created="2016-04-06T19:58:16.000Z",
|
||||||
|
modified="2016-04-06T19:58:16.000Z",
|
||||||
|
first_observed="2015-12-21T19:00:00Z",
|
||||||
|
last_observed="2015-12-21T19:00:00Z",
|
||||||
|
number_observed=50,
|
||||||
|
objects={
|
||||||
|
"0": {
|
||||||
|
"name": "foo.exe",
|
||||||
|
"type": "file"
|
||||||
|
},
|
||||||
|
"1": {
|
||||||
|
"type": "ipv4-addr",
|
||||||
|
"value": "198.51.100.3"
|
||||||
|
},
|
||||||
|
"2": {
|
||||||
|
"type": "network-traffic",
|
||||||
|
"src_ref": "1",
|
||||||
|
"protocols": [
|
||||||
|
"tcp",
|
||||||
|
"http"
|
||||||
|
],
|
||||||
|
"extensions": {
|
||||||
|
"http-request-ext": {
|
||||||
|
"request_method": "get",
|
||||||
|
"request_value": "/download.html",
|
||||||
|
"request_version": "http/1.1",
|
||||||
|
"request_header": {
|
||||||
|
"Accept-Encoding": "gzip,deflate",
|
||||||
|
"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113",
|
||||||
|
"Host": "www.example.com"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
), ('1', {"type": "ipv4-addr", "value": "198.51.100.3"}), 1),
|
||||||
|
({
|
||||||
|
"type": "x-example",
|
||||||
|
"id": "x-example--d5413db2-c26c-42e0-b0e0-ec800a310bfb",
|
||||||
|
"created": "2018-06-11T01:25:22.063Z",
|
||||||
|
"modified": "2018-06-11T01:25:22.063Z",
|
||||||
|
"dictionary": {
|
||||||
|
"key": {
|
||||||
|
"key_one": "value",
|
||||||
|
"key_two": "value"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, ('key', {'key_one': 'value', 'key_two': 'value'}), 0),
|
||||||
|
({
|
||||||
|
"type": "language-content",
|
||||||
|
"id": "language-content--b86bd89f-98bb-4fa9-8cb2-9ad421da981d",
|
||||||
|
"created": "2017-02-08T21:31:22.007Z",
|
||||||
|
"modified": "2017-02-08T21:31:22.007Z",
|
||||||
|
"object_ref": "campaign--12a111f0-b824-4baf-a224-83b80237a094",
|
||||||
|
"object_modified": "2017-02-08T21:31:22.007Z",
|
||||||
|
"contents": {
|
||||||
|
"de": {
|
||||||
|
"name": "Bank Angriff 1",
|
||||||
|
"description": "Weitere Informationen über Banküberfall"
|
||||||
|
},
|
||||||
|
"fr": {
|
||||||
|
"name": "Attaque Bank 1",
|
||||||
|
"description": "Plus d'informations sur la crise bancaire"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, ('fr', {"name": "Attaque Bank 1", "description": "Plus d'informations sur la crise bancaire"}), 1)
|
||||||
|
])
|
||||||
|
def test_find_property_index(object, tuple_to_find, expected_index):
|
||||||
|
assert stix2.utils.find_property_index(
|
||||||
|
object,
|
||||||
|
[],
|
||||||
|
tuple_to_find
|
||||||
|
) == expected_index
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('dict_value, tuple_to_find, expected_index', [
|
||||||
|
({
|
||||||
|
"contents": {
|
||||||
|
"de": {
|
||||||
|
"name": "Bank Angriff 1",
|
||||||
|
"description": "Weitere Informationen über Banküberfall"
|
||||||
|
},
|
||||||
|
"fr": {
|
||||||
|
"name": "Attaque Bank 1",
|
||||||
|
"description": "Plus d'informations sur la crise bancaire"
|
||||||
|
},
|
||||||
|
"es": {
|
||||||
|
"name": "Ataque al Banco",
|
||||||
|
"description": "Mas informacion sobre el ataque al banco"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, ('es', {"name": "Ataque al Banco", "description": "Mas informacion sobre el ataque al banco"}), 1), # Sorted alphabetically
|
||||||
|
({
|
||||||
|
'my_list': [
|
||||||
|
{"key_one": 1},
|
||||||
|
{"key_two": 2}
|
||||||
|
]
|
||||||
|
}, ('key_one', 1), 0)
|
||||||
|
])
|
||||||
|
def test_iterate_over_values(dict_value, tuple_to_find, expected_index):
|
||||||
|
assert stix2.utils._iterate_over_values(dict_value.values(), tuple_to_find) == expected_index
|
||||||
|
|
109
stix2/utils.py
109
stix2/utils.py
|
@ -165,48 +165,87 @@ def _get_dict(data):
|
||||||
raise ValueError("Cannot convert '%s' to dictionary." % str(data))
|
raise ValueError("Cannot convert '%s' to dictionary." % str(data))
|
||||||
|
|
||||||
|
|
||||||
|
def _iterate_over_values(dict_values, tuple_to_find):
|
||||||
|
"""Loop recursively over dictionary values"""
|
||||||
|
from .base import _STIXBase
|
||||||
|
for pv in dict_values:
|
||||||
|
if isinstance(pv, list):
|
||||||
|
for item in pv:
|
||||||
|
if isinstance(item, _STIXBase):
|
||||||
|
index = find_property_index(
|
||||||
|
item,
|
||||||
|
item.object_properties(),
|
||||||
|
tuple_to_find
|
||||||
|
)
|
||||||
|
if index is not None:
|
||||||
|
return index
|
||||||
|
elif isinstance(item, dict):
|
||||||
|
for idx, val in enumerate(sorted(item)):
|
||||||
|
if (tuple_to_find[0] == val and
|
||||||
|
item.get(val) == tuple_to_find[1]):
|
||||||
|
return idx
|
||||||
|
elif isinstance(pv, dict):
|
||||||
|
if pv.get(tuple_to_find[0]) is not None:
|
||||||
|
for idx, item in enumerate(sorted(pv.keys())):
|
||||||
|
if ((item == tuple_to_find[0] and str.isdigit(item)) and
|
||||||
|
(pv[item] == tuple_to_find[1])):
|
||||||
|
return int(tuple_to_find[0])
|
||||||
|
elif pv[item] == tuple_to_find[1]:
|
||||||
|
return idx
|
||||||
|
for item in pv.values():
|
||||||
|
if isinstance(item, _STIXBase):
|
||||||
|
index = find_property_index(
|
||||||
|
item,
|
||||||
|
item.object_properties(),
|
||||||
|
tuple_to_find
|
||||||
|
)
|
||||||
|
if index is not None:
|
||||||
|
return index
|
||||||
|
elif isinstance(item, dict):
|
||||||
|
index = find_property_index(
|
||||||
|
item,
|
||||||
|
item.keys(),
|
||||||
|
tuple_to_find
|
||||||
|
)
|
||||||
|
if index is not None:
|
||||||
|
return index
|
||||||
|
|
||||||
|
|
||||||
def find_property_index(obj, properties, tuple_to_find):
|
def find_property_index(obj, properties, tuple_to_find):
|
||||||
"""Recursively find the property in the object model, return the index
|
"""Recursively find the property in the object model, return the index
|
||||||
according to the _properties OrderedDict. If it's a list look for
|
according to the ``properties`` OrderedDict when working with `stix2`
|
||||||
individual objects. Returns and integer indicating its location
|
objects. If it's a list look for individual objects. Returns and integer
|
||||||
|
indicating its location.
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
This method is intended to pretty print `stix2` properties for better
|
||||||
|
visual feedback when working with the library.
|
||||||
|
|
||||||
|
Warnings:
|
||||||
|
This method may not be able to produce the same output if called
|
||||||
|
multiple times and makes a best effort attempt to print the properties
|
||||||
|
according to the STIX technical specification.
|
||||||
|
|
||||||
|
See Also:
|
||||||
|
py:meth:`stix2.base._STIXBase.serialize` for more information.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from .base import _STIXBase
|
from .base import _STIXBase
|
||||||
try:
|
try:
|
||||||
if tuple_to_find[1] in obj._inner.values():
|
if isinstance(obj, _STIXBase):
|
||||||
return properties.index(tuple_to_find[0])
|
if tuple_to_find[1] in obj._inner.values():
|
||||||
|
return properties.index(tuple_to_find[0])
|
||||||
|
elif isinstance(obj, dict):
|
||||||
|
for idx, val in enumerate(sorted(obj)):
|
||||||
|
if (tuple_to_find[0] == val and
|
||||||
|
obj.get(val) == tuple_to_find[1]):
|
||||||
|
return idx
|
||||||
raise ValueError
|
raise ValueError
|
||||||
except ValueError:
|
except ValueError:
|
||||||
for pv in obj._inner.values():
|
if isinstance(obj, _STIXBase):
|
||||||
if isinstance(pv, list):
|
return _iterate_over_values(obj._inner.values(), tuple_to_find)
|
||||||
for item in pv:
|
elif isinstance(obj, dict):
|
||||||
if isinstance(item, _STIXBase):
|
return _iterate_over_values(obj.values(), tuple_to_find)
|
||||||
val = find_property_index(item,
|
|
||||||
item.object_properties(),
|
|
||||||
tuple_to_find)
|
|
||||||
if val is not None:
|
|
||||||
return val
|
|
||||||
elif isinstance(item, dict):
|
|
||||||
for idx, val in enumerate(sorted(item)):
|
|
||||||
if (tuple_to_find[0] == val and
|
|
||||||
item.get(val) == tuple_to_find[1]):
|
|
||||||
return idx
|
|
||||||
elif isinstance(pv, dict):
|
|
||||||
if pv.get(tuple_to_find[0]) is not None:
|
|
||||||
try:
|
|
||||||
return int(tuple_to_find[0])
|
|
||||||
except ValueError:
|
|
||||||
return len(tuple_to_find[0])
|
|
||||||
for item in pv.values():
|
|
||||||
if isinstance(item, _STIXBase):
|
|
||||||
val = find_property_index(item,
|
|
||||||
item.object_properties(),
|
|
||||||
tuple_to_find)
|
|
||||||
if val is not None:
|
|
||||||
return val
|
|
||||||
elif isinstance(item, dict) and tuple_to_find[0] in item:
|
|
||||||
for num, t in enumerate(item.keys(), start=1):
|
|
||||||
if t == tuple_to_find[0]:
|
|
||||||
return num
|
|
||||||
|
|
||||||
|
|
||||||
def new_version(data, **kwargs):
|
def new_version(data, **kwargs):
|
||||||
|
|
Loading…
Reference in New Issue