WIP:allow unknown custom objects to be processed by parse; WIP: splitting up parse utility into components; found bug in tests that wasnt providing for proper teardown cleaning, fixed

stix2.0
= 2018-03-29 11:49:30 -04:00
parent e92db2417a
commit 89cf4bc38f
2 changed files with 52 additions and 13 deletions

View File

@ -77,14 +77,36 @@ def parse(data, allow_custom=False, version=None):
Args: Args:
data (str, dict, file-like object): The STIX 2 content to be parsed. data (str, dict, file-like object): The STIX 2 content to be parsed.
allow_custom (bool): Whether to allow custom properties or not. allow_custom (bool): Whether to allow custom properties as well unknown
Default: False. custom objects. Note that unknown custom objects cannot be parsed
into STIX objects, and will be returned as is. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version. None, use latest version.
Returns: Returns:
An instantiated Python STIX object. An instantiated Python STIX object.
"""
# convert STIX object to dict, if not already
obj = get_dict(data)
# convert dict to full python-stix2 obj
obj = dict_to_stix2(obj, allow_custom, version)
return obj
def dict_to_stix2(stix_dict, allow_custom=False, version=None):
"""convert dictionary to full python-stix2 object
Args:
stix_dict (dict): a python dictionary of a STIX object
that (presumably) is semantically correct to be parsed
into a full python-stix2 obj
allow_custom (bool): Whether to allow custom properties as well unknown
custom objects. Note that unknown custom objects cannot be parsed
into STIX objects, and will be returned as is. Default: False.
""" """
if not version: if not version:
# Use latest version # Use latest version
@ -93,16 +115,20 @@ def parse(data, allow_custom=False, version=None):
v = 'v' + version.replace('.', '') v = 'v' + version.replace('.', '')
OBJ_MAP = STIX2_OBJ_MAPS[v] OBJ_MAP = STIX2_OBJ_MAPS[v]
obj = get_dict(data)
if 'type' not in obj: if 'type' not in stix_dict:
raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj)) raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict))
try: try:
obj_class = OBJ_MAP[obj['type']] obj_class = OBJ_MAP[stix_dict['type']]
except KeyError: except KeyError:
raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj['type']) if allow_custom:
return obj_class(allow_custom=allow_custom, **obj) # flag allows for unknown custom objects too, but will not
# be parsed into STIX object, returned as is
return stix_dict
raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % stix_dict['type'])
return obj_class(allow_custom=allow_custom, **stix_dict)
def _register_type(new_type, version=None): def _register_type(new_type, version=None):

View File

@ -136,6 +136,19 @@ def rel_mem_store():
yield MemoryStore(stix_objs) yield MemoryStore(stix_objs)
@pytest.fixture
def fs_mem_store(request, mem_store):
filename = 'memory_test/mem_store.json'
mem_store.save_to_file(filename)
def fin():
# teardown, excecuted regardless of exception
shutil.rmtree(os.path.dirname(filename))
request.addfinalizer(fin)
return filename
def test_memory_source_get(mem_source): def test_memory_source_get(mem_source):
resp = mem_source.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f") resp = mem_source.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
@ -187,9 +200,11 @@ def test_memory_store_query_multiple_filters(mem_store):
assert len(resp) == 1 assert len(resp) == 1
def test_memory_store_save_load_file(mem_store): def test_memory_store_save_load_file(mem_store, fs_mem_store):
filename = 'memory_test/mem_store.json' filename = fs_mem_store # the fixture fs_mem_store yields filename where the memory store was written to
mem_store.save_to_file(filename)
# STIX2 contents of mem_store have already been written to file
# (this is done in fixture 'fs_mem_store'), so can already read-in here
contents = open(os.path.abspath(filename)).read() contents = open(os.path.abspath(filename)).read()
assert '"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",' in contents assert '"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",' in contents
@ -200,8 +215,6 @@ def test_memory_store_save_load_file(mem_store):
assert mem_store2.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f") assert mem_store2.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
assert mem_store2.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") assert mem_store2.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
shutil.rmtree(os.path.dirname(filename))
def test_memory_store_add_invalid_object(mem_store): def test_memory_store_add_invalid_object(mem_store):
ind = ('indicator', IND1) # tuple isn't valid ind = ('indicator', IND1) # tuple isn't valid