From 89cf4bc38f6b65769a6a989880124ab0905b8626 Mon Sep 17 00:00:00 2001 From: = Date: Thu, 29 Mar 2018 11:49:30 -0400 Subject: [PATCH 1/2] WIP:allow unknown custom objects to be processed by parse; WIP: splitting up parse utility into components; found bug in tests that wasnt providing for proper teardown cleaning, fixed --- stix2/core.py | 42 +++++++++++++++++++++++++++++++-------- stix2/test/test_memory.py | 23 ++++++++++++++++----- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/stix2/core.py b/stix2/core.py index b6d295d..64307ff 100644 --- a/stix2/core.py +++ b/stix2/core.py @@ -77,14 +77,36 @@ def parse(data, allow_custom=False, version=None): Args: data (str, dict, file-like object): The STIX 2 content to be parsed. - allow_custom (bool): Whether to allow custom properties or not. - Default: False. + allow_custom (bool): Whether to allow custom properties as well unknown + custom objects. Note that unknown custom objects cannot be parsed + into STIX objects, and will be returned as is. Default: False. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. Returns: An instantiated Python STIX object. + """ + # convert STIX object to dict, if not already + obj = get_dict(data) + + # convert dict to full python-stix2 obj + obj = dict_to_stix2(obj, allow_custom, version) + + return obj + + +def dict_to_stix2(stix_dict, allow_custom=False, version=None): + """convert dictionary to full python-stix2 object + + Args: + stix_dict (dict): a python dictionary of a STIX object + that (presumably) is semantically correct to be parsed + into a full python-stix2 obj + allow_custom (bool): Whether to allow custom properties as well unknown + custom objects. Note that unknown custom objects cannot be parsed + into STIX objects, and will be returned as is. Default: False. + """ if not version: # Use latest version @@ -93,16 +115,20 @@ def parse(data, allow_custom=False, version=None): v = 'v' + version.replace('.', '') OBJ_MAP = STIX2_OBJ_MAPS[v] - obj = get_dict(data) - if 'type' not in obj: - raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj)) + if 'type' not in stix_dict: + raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict)) try: - obj_class = OBJ_MAP[obj['type']] + obj_class = OBJ_MAP[stix_dict['type']] except KeyError: - raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj['type']) - return obj_class(allow_custom=allow_custom, **obj) + if allow_custom: + # flag allows for unknown custom objects too, but will not + # be parsed into STIX object, returned as is + return stix_dict + raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % stix_dict['type']) + + return obj_class(allow_custom=allow_custom, **stix_dict) def _register_type(new_type, version=None): diff --git a/stix2/test/test_memory.py b/stix2/test/test_memory.py index 2384848..284c43e 100644 --- a/stix2/test/test_memory.py +++ b/stix2/test/test_memory.py @@ -136,6 +136,19 @@ def rel_mem_store(): yield MemoryStore(stix_objs) +@pytest.fixture +def fs_mem_store(request, mem_store): + filename = 'memory_test/mem_store.json' + mem_store.save_to_file(filename) + + def fin(): + # teardown, excecuted regardless of exception + shutil.rmtree(os.path.dirname(filename)) + request.addfinalizer(fin) + + return filename + + def test_memory_source_get(mem_source): resp = mem_source.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f") assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" @@ -187,9 +200,11 @@ def test_memory_store_query_multiple_filters(mem_store): assert len(resp) == 1 -def test_memory_store_save_load_file(mem_store): - filename = 'memory_test/mem_store.json' - mem_store.save_to_file(filename) +def test_memory_store_save_load_file(mem_store, fs_mem_store): + filename = fs_mem_store # the fixture fs_mem_store yields filename where the memory store was written to + + # STIX2 contents of mem_store have already been written to file + # (this is done in fixture 'fs_mem_store'), so can already read-in here contents = open(os.path.abspath(filename)).read() assert '"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",' in contents @@ -200,8 +215,6 @@ def test_memory_store_save_load_file(mem_store): assert mem_store2.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f") assert mem_store2.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f") - shutil.rmtree(os.path.dirname(filename)) - def test_memory_store_add_invalid_object(mem_store): ind = ('indicator', IND1) # tuple isn't valid From 90834c5b953c060a08bf866e437114b3bca3c4d4 Mon Sep 17 00:00:00 2001 From: = Date: Fri, 30 Mar 2018 13:21:07 -0400 Subject: [PATCH 2/2] docs and tests for parse() mod --- docs/guide/parsing.ipynb | 395 +++++++++++++++++++++++++++++++++++++- stix2/core.py | 19 +- stix2/test/test_custom.py | 14 ++ 3 files changed, 421 insertions(+), 7 deletions(-) diff --git a/docs/guide/parsing.ipynb b/docs/guide/parsing.ipynb index d24f994..b3460b3 100644 --- a/docs/guide/parsing.ipynb +++ b/docs/guide/parsing.ipynb @@ -63,21 +63,120 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Parsing STIX content is as easy as calling the [parse()](../api/stix2.core.rst#stix2.core.parse) function on a JSON string. It will automatically determine the type of the object. The STIX objects within `bundle` objects, and the cyber observables contained within `observed-data` objects will be parsed as well." + "Parsing STIX content is as easy as calling the [parse()](../api/stix2.core.rst#stix2.core.parse) function on a JSON string, dictionary, or file-like object. It will automatically determine the type of the object. The STIX objects within `bundle` objects, and the cyber observables contained within `observed-data` objects will be parsed as well.\n", + "\n", + "**Parsing a string**" ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "observed-data\n", - "0969de02ecf8a5f003e3f6d063d848c8a193aada092623f8ce408c15bcb5f038\n" + "\n" ] + }, + { + "data": { + "text/html": [ + "
{\n",
+       "    "type": "observed-data",\n",
+       "    "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",\n",
+       "    "created": "2016-04-06T19:58:16.000Z",\n",
+       "    "modified": "2016-04-06T19:58:16.000Z",\n",
+       "    "first_observed": "2015-12-21T19:00:00Z",\n",
+       "    "last_observed": "2015-12-21T19:00:00Z",\n",
+       "    "number_observed": 50,\n",
+       "    "objects": {\n",
+       "        "0": {\n",
+       "            "type": "file",\n",
+       "            "hashes": {\n",
+       "                "SHA-256": "0969de02ecf8a5f003e3f6d063d848c8a193aada092623f8ce408c15bcb5f038"\n",
+       "            }\n",
+       "        }\n",
+       "    }\n",
+       "}\n",
+       "
\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" } ], "source": [ @@ -102,8 +201,292 @@ "}\"\"\"\n", "\n", "obj = parse(input_string)\n", - "print(obj.type)\n", - "print(obj.objects[\"0\"].hashes['SHA-256'])" + "print(type(obj))\n", + "print(obj)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Parsing a dictionary**" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "data": { + "text/html": [ + "
{\n",
+       "    "type": "identity",\n",
+       "    "id": "identity--311b2d2d-f010-5473-83ec-1edf84858f4c",\n",
+       "    "created": "2015-12-21T19:59:11.000Z",\n",
+       "    "modified": "2015-12-21T19:59:11.000Z",\n",
+       "    "name": "Cole Powers",\n",
+       "    "identity_class": "individual"\n",
+       "}\n",
+       "
\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "input_dict = {\n", + " \"type\": \"identity\",\n", + " \"id\": \"identity--311b2d2d-f010-5473-83ec-1edf84858f4c\",\n", + " \"created\": \"2015-12-21T19:59:11Z\",\n", + " \"modified\": \"2015-12-21T19:59:11Z\",\n", + " \"name\": \"Cole Powers\",\n", + " \"identity_class\": \"individual\"\n", + "}\n", + "\n", + "obj = parse(input_dict)\n", + "print(type(obj))\n", + "print(obj)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Parsing a file-like object**" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "data": { + "text/html": [ + "
{\n",
+       "    "type": "course-of-action",\n",
+       "    "id": "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd",\n",
+       "    "created_by_ref": "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",\n",
+       "    "created": "2017-05-31T21:30:41.022Z",\n",
+       "    "modified": "2017-05-31T21:30:41.022Z",\n",
+       "    "name": "Data from Network Shared Drive Mitigation",\n",
+       "    "description": "Identify unnecessary system utilities or potentially malicious software that may be used to collect data from a network share, and audit and/or block them by using whitelisting[[CiteRef::Beechey 2010]] tools, like AppLocker,[[CiteRef::Windows Commands JPCERT]][[CiteRef::NSA MS AppLocker]] or Software Restriction Policies[[CiteRef::Corio 2008]] where appropriate.[[CiteRef::TechNet Applocker vs SRP]]"\n",
+       "}\n",
+       "
\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "file_handle = open(\"/home/michael/cti-python-stix2/stix2/test/stix2_data/course-of-action/course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd.json\")\n", + "\n", + "obj = parse(file_handle)\n", + "print(type(obj))\n", + "print(obj)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Parsing Custom STIX Content" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Parsing custom STIX objects and/or STIX objects with custom properties is also completed easily with [parse()](../api/stix2.core.rst#stix2.core.parse). Just supply the keyword argument *allow_custom=True*. When *allow_custom* is specified, [parse()](../api/stix2.core.rst#stix2.core.parse) will attempt to convert the supplied STIX content to known STIX2 domain objects and/or previously defined custom defined STIX2 objects. If the conversion cannot be completed (and *allow_custom* is specified), [parse()](../api/stix2.core.rst#stix2.core.parse) will treat the supplied STIX2 content as valid STIX2 objects and return them. **Warning: Specifying *allow_custom* may lead to critical errors if further processing (searching, filtering, modifying etc...) of the custom STIX2 content occurs where the custom STIX2 content supplied is not valid STIX2**. This is an axiomatic possibility as the STIX2 library cannot guarantee proper processing of unknown custom STIX2 objects that were explicitly flagged to be allowed, and thus may not be valid.\n", + "\n", + "For examples on parsing STIX2 objects with custom STIX properties, see [Custom STIX Content:Custom Properties](custom.ipynb#Custom-Properties)\n", + "\n", + "For examples on parsing defined custom STIX2 objects, see [Custom STIX Content: Custom STIX Object Types](custom.ipynb#Custom-STIX-Object-Types)\n", + "\n", + "For the case where it is desired to retrieve STIX2 content from a source (e.g. file system, TAXII) that may possibly have custom STIX2 content unknown to the user, the user can create a STIX2 DataStore/Source with the flag *allow_custom=True*. As aforementioned this will configure the DataStore/Source to allow for unknown STIX2 content to be returned (albeit not converted to full STIX2 domain objects and properties); notable processing capabilites of the STIX2 library may be precluded by the unknown STIX2 content, if the content is not valid or actual STIX2 domain objects and properties." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from taxii2client import Collection\n", + "from stix2 import CompositeDataSource, FileSystemSource, TAXIICollectionSource\n", + "\n", + "# to allow for the retrieval of unknown custom STIX2 content,\n", + "# just create *Stores/*Sources with the 'allow_custom' flag\n", + "\n", + "# create FileSystemStore\n", + "fs = FileSystemSource(\"/path/to/stix2_data/\", allow_custom=True)\n", + "\n", + "# create TAXIICollectionSource\n", + "colxn = Collection('http://taxii_url')\n", + "ts = TAXIICollectionSource(colxn, allow_custom=True)\n" ] } ], diff --git a/stix2/core.py b/stix2/core.py index 64307ff..7de7984 100644 --- a/stix2/core.py +++ b/stix2/core.py @@ -73,7 +73,7 @@ STIX2_OBJ_MAPS = {} def parse(data, allow_custom=False, version=None): - """Deserialize a string or file-like object into a STIX object. + """Convert a string, dict or file-like object into a STIX object. Args: data (str, dict, file-like object): The STIX 2 content to be parsed. @@ -86,6 +86,13 @@ def parse(data, allow_custom=False, version=None): Returns: An instantiated Python STIX object. + WARNING: 'allow_custom=True' will allow for the return of any supplied STIX + dict(s) that cannot be found to map to any known STIX object types (both STIX2 + domain objects or defined custom STIX2 objects); NO validation is done. This is + done to allow the processing of possibly unknown custom STIX objects (example + scenario: I need to query a third-party TAXII endpoint that could provide custom + STIX objects that I dont know about ahead of time) + """ # convert STIX object to dict, if not already obj = get_dict(data) @@ -107,6 +114,16 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None): custom objects. Note that unknown custom objects cannot be parsed into STIX objects, and will be returned as is. Default: False. + Returns: + An instantiated Python STIX object + + WARNING: 'allow_custom=True' will allow for the return of any supplied STIX + dict(s) that cannot be found to map to any known STIX object types (both STIX2 + domain objects or defined custom STIX2 objects); NO validation is done. This is + done to allow the processing of possibly unknown custom STIX objects (example + scenario: I need to query a third-party TAXII endpoint that could provide custom + STIX objects that I dont know about ahead of time) + """ if not version: # Use latest version diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index 76ad61b..cc8b32b 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -221,6 +221,20 @@ def test_parse_unregistered_custom_object_type(): assert "use the CustomObject decorator." in str(excinfo.value) +def test_parse_unregistered_custom_object_type_w_allow_custom(): + """parse an unknown custom object, allowed by passing + 'allow_custom' flag + """ + nt_string = """{ + "type": "x-foobar-observable", + "created": "2015-12-21T19:59:11Z", + "property1": "something" + }""" + + custom_obj = stix2.parse(nt_string, allow_custom=True) + assert custom_obj["type"] == "x-foobar-observable" + + @stix2.observables.CustomObservable('x-new-observable', [ ('property1', stix2.properties.StringProperty(required=True)), ('property2', stix2.properties.IntegerProperty()),