Merge pull request #150 from oasis-open/38-workbench

Create Workbench layer
stix2.0
Greg Back 2018-04-05 10:09:23 -05:00 committed by GitHub
commit a1ad90d43f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1280 additions and 39 deletions

View File

@ -1,5 +1,6 @@
[settings]
not_skip = __init__.py
skip = workbench.py
known_third_party =
dateutil,
ordereddict,

View File

@ -0,0 +1,5 @@
workbench
===============
.. automodule:: stix2.workbench
:members:

View File

@ -115,5 +115,16 @@ class STIXPropertyDocumenter(ClassDocumenter):
self.add_line('', '<stixattr>')
def autodoc_skipper(app, what, name, obj, skip, options):
"""Customize Sphinx to skip some member we don't want documented.
Skips anything containing ':autodoc-skip:' in its docstring.
"""
if obj.__doc__ and ':autodoc-skip:' in obj.__doc__:
return skip or True
return skip
def setup(app):
app.add_autodocumenter(STIXPropertyDocumenter)
app.connect('autodoc-skip-member', autodoc_skipper)

485
docs/guide/workbench.ipynb Normal file
View File

@ -0,0 +1,485 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"nbsphinx": "hidden"
},
"outputs": [],
"source": [
"# Delete this cell to re-enable tracebacks\n",
"import sys\n",
"ipython = get_ipython()\n",
"\n",
"def hide_traceback(exc_tuple=None, filename=None, tb_offset=None,\n",
" exception_only=False, running_compiled_code=False):\n",
" etype, value, tb = sys.exc_info()\n",
" return ipython._showtraceback(etype, value, ipython.InteractiveTB.get_exception_only(etype, value))\n",
"\n",
"ipython.showtraceback = hide_traceback"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"nbsphinx": "hidden"
},
"outputs": [],
"source": [
"# JSON output syntax highlighting\n",
"from __future__ import print_function\n",
"from pygments import highlight\n",
"from pygments.lexers import JsonLexer\n",
"from pygments.formatters import HtmlFormatter\n",
"from six.moves import builtins\n",
"from IPython.display import display, HTML\n",
"\n",
"def json_print(inpt):\n",
" string = str(inpt)\n",
" if string[0] == '{':\n",
" formatter = HtmlFormatter()\n",
" display(HTML('<style type=\"text/css\">{}</style>{}'.format(\n",
" formatter.get_style_defs('.highlight'),\n",
" highlight(string, JsonLexer(), formatter))))\n",
" else:\n",
" builtins.print(inpt)\n",
"\n",
"globals()['print'] = json_print"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using The Workbench"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The [Workbench API](../api/stix2.workbench.rst) hides most of the complexity of the rest of the library to make it easy to interact with STIX data. To use it, just import everything from ``stix2.workbench``:"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"from stix2.workbench import *"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Retrieving STIX Data\n",
"\n",
"To get some STIX data to work with, let's set up a DataSource and add it to our workbench."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"from taxii2client import Collection\n",
"\n",
"collection = Collection(\"http://127.0.0.1:5000/trustgroup1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/\", user=\"admin\", password=\"Password0\")\n",
"tc_source = TAXIICollectionSource(collection)\n",
"add_data_source(tc_source)"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"Now we can get all of the indicators from the data source."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"response = indicators()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Similar functions are available for the other STIX Object types. See the full list [here](../api/stix2.workbench.rst#stix2.workbench.attack_patterns).\n",
"\n",
"If you want to only retrieve *some* indicators, you can pass in one or more [Filters](../api/datastore/stix2.datastore.filters.rst). This example finds all the indicators created by a specific identity:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"response = indicators(filters=Filter('created_by_ref', '=', 'identity--adede3e8-bf44-4e6f-b3c9-1958cbc3b188'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The objects returned let you easily traverse their relationships. Get all Relationship objects involving that object with ``.relationships()``, all other objects related to this object with ``.related()``, and the Identity object for the creator of the object (if one exists) with ``.created_by()``. For full details on these methods and their arguments, see the [Workbench API](../api/stix2.workbench.rst) documentation."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"indicator--a932fcc6-e032-176c-126f-cb970a5a1ade\n",
"indicates\n",
"malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111\n"
]
}
],
"source": [
"for i in indicators():\n",
" for rel in i.relationships():\n",
" print(rel.source_ref)\n",
" print(rel.relationship_type)\n",
" print(rel.target_ref)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<style type=\"text/css\">.highlight .hll { background-color: #ffffcc }\n",
".highlight { background: #f8f8f8; }\n",
".highlight .c { color: #408080; font-style: italic } /* Comment */\n",
".highlight .err { border: 1px solid #FF0000 } /* Error */\n",
".highlight .k { color: #008000; font-weight: bold } /* Keyword */\n",
".highlight .o { color: #666666 } /* Operator */\n",
".highlight .ch { color: #408080; font-style: italic } /* Comment.Hashbang */\n",
".highlight .cm { color: #408080; font-style: italic } /* Comment.Multiline */\n",
".highlight .cp { color: #BC7A00 } /* Comment.Preproc */\n",
".highlight .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */\n",
".highlight .c1 { color: #408080; font-style: italic } /* Comment.Single */\n",
".highlight .cs { color: #408080; font-style: italic } /* Comment.Special */\n",
".highlight .gd { color: #A00000 } /* Generic.Deleted */\n",
".highlight .ge { font-style: italic } /* Generic.Emph */\n",
".highlight .gr { color: #FF0000 } /* Generic.Error */\n",
".highlight .gh { color: #000080; font-weight: bold } /* Generic.Heading */\n",
".highlight .gi { color: #00A000 } /* Generic.Inserted */\n",
".highlight .go { color: #888888 } /* Generic.Output */\n",
".highlight .gp { color: #000080; font-weight: bold } /* Generic.Prompt */\n",
".highlight .gs { font-weight: bold } /* Generic.Strong */\n",
".highlight .gu { color: #800080; font-weight: bold } /* Generic.Subheading */\n",
".highlight .gt { color: #0044DD } /* Generic.Traceback */\n",
".highlight .kc { color: #008000; font-weight: bold } /* Keyword.Constant */\n",
".highlight .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */\n",
".highlight .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */\n",
".highlight .kp { color: #008000 } /* Keyword.Pseudo */\n",
".highlight .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */\n",
".highlight .kt { color: #B00040 } /* Keyword.Type */\n",
".highlight .m { color: #666666 } /* Literal.Number */\n",
".highlight .s { color: #BA2121 } /* Literal.String */\n",
".highlight .na { color: #7D9029 } /* Name.Attribute */\n",
".highlight .nb { color: #008000 } /* Name.Builtin */\n",
".highlight .nc { color: #0000FF; font-weight: bold } /* Name.Class */\n",
".highlight .no { color: #880000 } /* Name.Constant */\n",
".highlight .nd { color: #AA22FF } /* Name.Decorator */\n",
".highlight .ni { color: #999999; font-weight: bold } /* Name.Entity */\n",
".highlight .ne { color: #D2413A; font-weight: bold } /* Name.Exception */\n",
".highlight .nf { color: #0000FF } /* Name.Function */\n",
".highlight .nl { color: #A0A000 } /* Name.Label */\n",
".highlight .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */\n",
".highlight .nt { color: #008000; font-weight: bold } /* Name.Tag */\n",
".highlight .nv { color: #19177C } /* Name.Variable */\n",
".highlight .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */\n",
".highlight .w { color: #bbbbbb } /* Text.Whitespace */\n",
".highlight .mb { color: #666666 } /* Literal.Number.Bin */\n",
".highlight .mf { color: #666666 } /* Literal.Number.Float */\n",
".highlight .mh { color: #666666 } /* Literal.Number.Hex */\n",
".highlight .mi { color: #666666 } /* Literal.Number.Integer */\n",
".highlight .mo { color: #666666 } /* Literal.Number.Oct */\n",
".highlight .sa { color: #BA2121 } /* Literal.String.Affix */\n",
".highlight .sb { color: #BA2121 } /* Literal.String.Backtick */\n",
".highlight .sc { color: #BA2121 } /* Literal.String.Char */\n",
".highlight .dl { color: #BA2121 } /* Literal.String.Delimiter */\n",
".highlight .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */\n",
".highlight .s2 { color: #BA2121 } /* Literal.String.Double */\n",
".highlight .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */\n",
".highlight .sh { color: #BA2121 } /* Literal.String.Heredoc */\n",
".highlight .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */\n",
".highlight .sx { color: #008000 } /* Literal.String.Other */\n",
".highlight .sr { color: #BB6688 } /* Literal.String.Regex */\n",
".highlight .s1 { color: #BA2121 } /* Literal.String.Single */\n",
".highlight .ss { color: #19177C } /* Literal.String.Symbol */\n",
".highlight .bp { color: #008000 } /* Name.Builtin.Pseudo */\n",
".highlight .fm { color: #0000FF } /* Name.Function.Magic */\n",
".highlight .vc { color: #19177C } /* Name.Variable.Class */\n",
".highlight .vg { color: #19177C } /* Name.Variable.Global */\n",
".highlight .vi { color: #19177C } /* Name.Variable.Instance */\n",
".highlight .vm { color: #19177C } /* Name.Variable.Magic */\n",
".highlight .il { color: #666666 } /* Literal.Number.Integer.Long */</style><div class=\"highlight\"><pre><span></span><span class=\"p\">{</span>\n",
" <span class=\"nt\">&quot;type&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;malware&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;id&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;created&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;2017-01-27T13:49:53.997Z&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;modified&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;2017-01-27T13:49:53.997Z&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;name&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;Poison Ivy&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;description&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;Poison Ivy&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;labels&quot;</span><span class=\"p\">:</span> <span class=\"p\">[</span>\n",
" <span class=\"s2\">&quot;remote-access-trojan&quot;</span>\n",
" <span class=\"p\">]</span>\n",
"<span class=\"p\">}</span>\n",
"</pre></div>\n"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"for i in indicators():\n",
" for obj in i.related():\n",
" print(obj)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If there are a lot of related objects, you can narrow it down by passing in one or more [Filters](../api/datastore/stix2.datastore.filters.rst) just as before. For example, if we want to get only the indicators related to a specific piece of malware (and not any entities that use it or are targeted by it):"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<style type=\"text/css\">.highlight .hll { background-color: #ffffcc }\n",
".highlight { background: #f8f8f8; }\n",
".highlight .c { color: #408080; font-style: italic } /* Comment */\n",
".highlight .err { border: 1px solid #FF0000 } /* Error */\n",
".highlight .k { color: #008000; font-weight: bold } /* Keyword */\n",
".highlight .o { color: #666666 } /* Operator */\n",
".highlight .ch { color: #408080; font-style: italic } /* Comment.Hashbang */\n",
".highlight .cm { color: #408080; font-style: italic } /* Comment.Multiline */\n",
".highlight .cp { color: #BC7A00 } /* Comment.Preproc */\n",
".highlight .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */\n",
".highlight .c1 { color: #408080; font-style: italic } /* Comment.Single */\n",
".highlight .cs { color: #408080; font-style: italic } /* Comment.Special */\n",
".highlight .gd { color: #A00000 } /* Generic.Deleted */\n",
".highlight .ge { font-style: italic } /* Generic.Emph */\n",
".highlight .gr { color: #FF0000 } /* Generic.Error */\n",
".highlight .gh { color: #000080; font-weight: bold } /* Generic.Heading */\n",
".highlight .gi { color: #00A000 } /* Generic.Inserted */\n",
".highlight .go { color: #888888 } /* Generic.Output */\n",
".highlight .gp { color: #000080; font-weight: bold } /* Generic.Prompt */\n",
".highlight .gs { font-weight: bold } /* Generic.Strong */\n",
".highlight .gu { color: #800080; font-weight: bold } /* Generic.Subheading */\n",
".highlight .gt { color: #0044DD } /* Generic.Traceback */\n",
".highlight .kc { color: #008000; font-weight: bold } /* Keyword.Constant */\n",
".highlight .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */\n",
".highlight .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */\n",
".highlight .kp { color: #008000 } /* Keyword.Pseudo */\n",
".highlight .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */\n",
".highlight .kt { color: #B00040 } /* Keyword.Type */\n",
".highlight .m { color: #666666 } /* Literal.Number */\n",
".highlight .s { color: #BA2121 } /* Literal.String */\n",
".highlight .na { color: #7D9029 } /* Name.Attribute */\n",
".highlight .nb { color: #008000 } /* Name.Builtin */\n",
".highlight .nc { color: #0000FF; font-weight: bold } /* Name.Class */\n",
".highlight .no { color: #880000 } /* Name.Constant */\n",
".highlight .nd { color: #AA22FF } /* Name.Decorator */\n",
".highlight .ni { color: #999999; font-weight: bold } /* Name.Entity */\n",
".highlight .ne { color: #D2413A; font-weight: bold } /* Name.Exception */\n",
".highlight .nf { color: #0000FF } /* Name.Function */\n",
".highlight .nl { color: #A0A000 } /* Name.Label */\n",
".highlight .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */\n",
".highlight .nt { color: #008000; font-weight: bold } /* Name.Tag */\n",
".highlight .nv { color: #19177C } /* Name.Variable */\n",
".highlight .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */\n",
".highlight .w { color: #bbbbbb } /* Text.Whitespace */\n",
".highlight .mb { color: #666666 } /* Literal.Number.Bin */\n",
".highlight .mf { color: #666666 } /* Literal.Number.Float */\n",
".highlight .mh { color: #666666 } /* Literal.Number.Hex */\n",
".highlight .mi { color: #666666 } /* Literal.Number.Integer */\n",
".highlight .mo { color: #666666 } /* Literal.Number.Oct */\n",
".highlight .sa { color: #BA2121 } /* Literal.String.Affix */\n",
".highlight .sb { color: #BA2121 } /* Literal.String.Backtick */\n",
".highlight .sc { color: #BA2121 } /* Literal.String.Char */\n",
".highlight .dl { color: #BA2121 } /* Literal.String.Delimiter */\n",
".highlight .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */\n",
".highlight .s2 { color: #BA2121 } /* Literal.String.Double */\n",
".highlight .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */\n",
".highlight .sh { color: #BA2121 } /* Literal.String.Heredoc */\n",
".highlight .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */\n",
".highlight .sx { color: #008000 } /* Literal.String.Other */\n",
".highlight .sr { color: #BB6688 } /* Literal.String.Regex */\n",
".highlight .s1 { color: #BA2121 } /* Literal.String.Single */\n",
".highlight .ss { color: #19177C } /* Literal.String.Symbol */\n",
".highlight .bp { color: #008000 } /* Name.Builtin.Pseudo */\n",
".highlight .fm { color: #0000FF } /* Name.Function.Magic */\n",
".highlight .vc { color: #19177C } /* Name.Variable.Class */\n",
".highlight .vg { color: #19177C } /* Name.Variable.Global */\n",
".highlight .vi { color: #19177C } /* Name.Variable.Instance */\n",
".highlight .vm { color: #19177C } /* Name.Variable.Magic */\n",
".highlight .il { color: #666666 } /* Literal.Number.Integer.Long */</style><div class=\"highlight\"><pre><span></span><span class=\"p\">{</span>\n",
" <span class=\"nt\">&quot;type&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;indicator&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;id&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;indicator--a932fcc6-e032-176c-126f-cb970a5a1ade&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;created&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;2014-05-08T09:00:00.000Z&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;modified&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;2014-05-08T09:00:00.000Z&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;name&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;File hash for Poison Ivy variant&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;pattern&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;[file:hashes.&#39;SHA-256&#39; = &#39;ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c&#39;]&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;valid_from&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;2014-05-08T09:00:00Z&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;labels&quot;</span><span class=\"p\">:</span> <span class=\"p\">[</span>\n",
" <span class=\"s2\">&quot;file-hash-watchlist&quot;</span>\n",
" <span class=\"p\">]</span>\n",
"<span class=\"p\">}</span>\n",
"</pre></div>\n"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"malware = get('malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111')\n",
"indicator = malware.related(filters=Filter('type', '=', 'indicator'))\n",
"print(indicator[0])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Creating STIX Data\n",
"\n",
"To create a STIX object, just use that object's class constructor. Once it's created, add it to the workbench with [save()](../api/datastore/stix2.workbench.rst#stix2.workbench.save)."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"identity = Identity(name=\"ACME Threat Intel Co.\", identity_class=\"organization\")\n",
"save(identity)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can also set defaults for certain properties when creating objects. For example, let's set the default creator to be the identity object we just created:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"set_default_creator(identity)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now when we create an indicator (or any other STIX Domain Object), it will automatically have the right ``create_by_ref`` value."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"ACME Threat Intel Co.\n"
]
}
],
"source": [
"indicator = Indicator(labels=[\"malicious-activity\"], pattern=\"[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']\")\n",
"save(indicator)\n",
"\n",
"indicator_creator = get(indicator.created_by_ref)\n",
"print(indicator_creator.name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Defaults can also be set for the [created timestamp](../api/datastore/stix2.workbench.rst#stix2.workbench.set_default_created), [external references](../api/datastore/stix2.workbench.rst#stix2.workbench.set_default_external_refs) and [object marking references](../api/datastore/stix2.workbench.rst#stix2.workbench.set_default_object_marking_refs)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<div class=\"alert alert-warning\">\n",
"\n",
"**Warning:**\n",
"\n",
"The workbench layer replaces STIX Object classes with special versions of them that use \"wrappers\" to provide extra functionality. Because of this, we recommend that you **either use the workbench layer or the rest of the library, but not both**. In other words, don't import from both ``stix2.workbench`` and any other submodules of ``stix2``.\n",
"\n",
"</div>"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -11,6 +11,7 @@
patterns
properties
utils
workbench
v20.common
v20.observables
v20.sdo

View File

@ -16,7 +16,7 @@ import uuid
from six import with_metaclass
from stix2.datastore.filters import Filter
from stix2.datastore.filters import Filter, _assemble_filters
from stix2.utils import deduplicate
@ -73,7 +73,7 @@ class DataStoreMixin(object):
stix_id (str): the id of the STIX object to retrieve.
Returns:
stix_objs (list): a list of STIX objects
list: All versions of the specified STIX object.
"""
try:
@ -91,7 +91,7 @@ class DataStoreMixin(object):
to conduct search on.
Returns:
stix_objs (list): a list of STIX objects
list: The STIX objects matching the query.
"""
try:
@ -99,6 +99,25 @@ class DataStoreMixin(object):
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def query_by_type(self, *args, **kwargs):
"""Retrieve all objects of the given STIX object type.
Translate query_by_type() call to the appropriate DataSource call.
Args:
obj_type (str): The STIX object type to retrieve.
filters (list, optional): A list of additional filters to apply to
the query.
Returns:
list: The STIX objects that matched the query.
"""
try:
return self.source.query_by_type(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def creator_of(self, *args, **kwargs):
"""Retrieve the Identity refered to by the object's `created_by_ref`.
@ -136,7 +155,7 @@ class DataStoreMixin(object):
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
list: The Relationship objects involving the given STIX object.
"""
try:
@ -162,9 +181,11 @@ class DataStoreMixin(object):
object is the source_ref. Default: False.
target_only (bool): Only examine Relationships for which this
object is the target_ref. Default: False.
filters (list): list of additional filters the related objects must
match.
Returns:
(list): List of STIX objects related to the given STIX object.
list: The STIX objects related to the given STIX object.
"""
try:
@ -175,8 +196,8 @@ class DataStoreMixin(object):
def add(self, *args, **kwargs):
"""Method for storing STIX objects.
Define custom behavior before storing STIX objects using the associated
DataSink. Translates add() to the appropriate DataSink call.
Defines custom behavior before storing STIX objects using the
appropriate method call on the associated DataSink.
Args:
stix_objs (list): a list of STIX objects
@ -240,7 +261,7 @@ class DataSource(with_metaclass(ABCMeta)):
specified by the "id".
Returns:
stix_obj: the STIX object
stix_obj: The STIX object.
"""
@ -258,7 +279,7 @@ class DataSource(with_metaclass(ABCMeta)):
specified by the "id".
Returns:
stix_objs (list): a list of STIX objects
list: All versions of the specified STIX object.
"""
@ -273,7 +294,7 @@ class DataSource(with_metaclass(ABCMeta)):
to conduct search on.
Returns:
stix_objs (list): a list of STIX objects
list: The STIX objects that matched the query.
"""
@ -311,7 +332,7 @@ class DataSource(with_metaclass(ABCMeta)):
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
list: The Relationship objects involving the given STIX object.
"""
results = []
@ -338,7 +359,7 @@ class DataSource(with_metaclass(ABCMeta)):
return results
def related_to(self, obj, relationship_type=None, source_only=False, target_only=False):
def related_to(self, obj, relationship_type=None, source_only=False, target_only=False, filters=None):
"""Retrieve STIX Objects that have a Relationship involving the given
STIX object.
@ -354,9 +375,11 @@ class DataSource(with_metaclass(ABCMeta)):
object is the source_ref. Default: False.
target_only (bool): Only examine Relationships for which this
object is the target_ref. Default: False.
filters (list): list of additional filters the related objects must
match.
Returns:
(list): List of STIX objects related to the given STIX object.
list: The STIX objects related to the given STIX object.
"""
results = []
@ -372,10 +395,13 @@ class DataSource(with_metaclass(ABCMeta)):
ids = set()
for r in rels:
ids.update((r.source_ref, r.target_ref))
ids.remove(obj_id)
ids.discard(obj_id)
# Assemble filters
filter_list = _assemble_filters(filters)
for i in ids:
results.append(self.get(i))
results.extend(self.query(filter_list + [Filter('id', '=', i)]))
return results
@ -425,7 +451,7 @@ class CompositeDataSource(DataSource):
to another parent CompositeDataSource), not user supplied.
Returns:
stix_obj: the STIX object to be returned.
stix_obj: The STIX object to be returned.
"""
if not self.has_data_sources():
@ -471,7 +497,7 @@ class CompositeDataSource(DataSource):
attached to a parent CompositeDataSource), not user supplied.
Returns:
all_data (list): list of STIX objects that have the specified id
list: The STIX objects that have the specified id.
"""
if not self.has_data_sources():
@ -510,7 +536,7 @@ class CompositeDataSource(DataSource):
attached to a parent CompositeDataSource), not user supplied.
Returns:
all_data (list): list of STIX objects to be returned
list: The STIX objects to be returned.
"""
if not self.has_data_sources():
@ -542,6 +568,35 @@ class CompositeDataSource(DataSource):
return all_data
def query_by_type(self, *args, **kwargs):
"""Retrieve all objects of the given STIX object type.
Federate the query to all DataSources attached to the
Composite Data Source.
Args:
obj_type (str): The STIX object type to retrieve.
filters (list, optional): A list of additional filters to apply to
the query.
Returns:
list: The STIX objects that matched the query.
"""
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
results = []
for ds in self.data_sources:
results.extend(ds.query_by_type(*args, **kwargs))
# remove exact duplicates (where duplicates are STIX 2.0
# objects with the same 'id' and 'modified' values)
if len(results) > 0:
results = deduplicate(results)
return results
def relationships(self, *args, **kwargs):
"""Retrieve Relationships involving the given STIX object.
@ -561,7 +616,7 @@ class CompositeDataSource(DataSource):
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
list: The Relationship objects involving the given STIX object.
"""
if not self.has_data_sources():
@ -597,9 +652,11 @@ class CompositeDataSource(DataSource):
object is the source_ref. Default: False.
target_only (bool): Only examine Relationships for which this
object is the target_ref. Default: False.
filters (list): list of additional filters the related objects must
match.
Returns:
(list): List of STIX objects related to the given STIX object.
list: The STIX objects related to the given STIX object.
"""
if not self.has_data_sources():

View File

@ -44,6 +44,37 @@ def _check_filter_components(prop, op, value):
return True
def _assemble_filters(filters1=None, filters2=None):
"""Assemble a list of filters.
This can be used to allow certain functions to work correctly no matter if
the user provides a single filter or a list of them.
Args:
filters1 (Filter or list, optional): The single Filter or list of Filters to
coerce into a list of Filters.
filters2 (Filter or list, optional): The single Filter or list of Filters to
append to the list of Filters.
Returns:
List of Filters.
"""
if filters1 is None:
filter_list = []
elif not isinstance(filters1, list):
filter_list = [filters1]
else:
filter_list = filters1
if isinstance(filters2, list):
filter_list.extend(filters2)
elif filters2 is not None:
filter_list.append(filters2)
return filter_list
class Filter(collections.namedtuple("Filter", ['property', 'op', 'value'])):
"""STIX 2 filters that support the querying functionality of STIX 2
DataStores and DataSources.

View File

@ -30,19 +30,43 @@ class ObjectFactory(object):
self._defaults = {}
if created_by_ref:
self._defaults['created_by_ref'] = created_by_ref
self.set_default_creator(created_by_ref)
if created:
self._defaults['created'] = created
# If the user provides a default "created" time, we also want to use
# that as the modified time.
self._defaults['modified'] = created
self.set_default_created(created)
if external_references:
self._defaults['external_references'] = external_references
self.set_default_external_refs(external_references)
if object_marking_refs:
self._defaults['object_marking_refs'] = object_marking_refs
self.set_default_object_marking_refs(object_marking_refs)
self._list_append = list_append
self._list_properties = ['external_references', 'object_marking_refs']
def set_default_creator(self, creator=None):
"""Set default value for the `created_by_ref` property.
"""
self._defaults['created_by_ref'] = creator
def set_default_created(self, created=None):
"""Set default value for the `created` property.
"""
self._defaults['created'] = created
# If the user provides a default "created" time, we also want to use
# that as the modified time.
self._defaults['modified'] = created
def set_default_external_refs(self, external_references=None):
"""Set default external references.
"""
self._defaults['external_references'] = external_references
def set_default_object_marking_refs(self, object_marking_refs=None):
"""Set default object markings.
"""
self._defaults['object_marking_refs'] = object_marking_refs
def create(self, cls, **kwargs):
"""Create a STIX object using object factory defaults.
@ -94,6 +118,7 @@ class Environment(DataStoreMixin):
.. automethod:: relationships
.. automethod:: related_to
.. automethod:: add
"""
def __init__(self, factory=ObjectFactory(), store=None, source=None, sink=None):
@ -113,6 +138,22 @@ class Environment(DataStoreMixin):
return self.factory.create(*args, **kwargs)
create.__doc__ = ObjectFactory.create.__doc__
def set_default_creator(self, *args, **kwargs):
return self.factory.set_default_creator(*args, **kwargs)
set_default_creator.__doc__ = ObjectFactory.set_default_creator.__doc__
def set_default_created(self, *args, **kwargs):
return self.factory.set_default_created(*args, **kwargs)
set_default_created.__doc__ = ObjectFactory.set_default_created.__doc__
def set_default_external_refs(self, *args, **kwargs):
return self.factory.set_default_external_refs(*args, **kwargs)
set_default_external_refs.__doc__ = ObjectFactory.set_default_external_refs.__doc__
def set_default_object_marking_refs(self, *args, **kwargs):
return self.factory.set_default_object_marking_refs(*args, **kwargs)
set_default_object_marking_refs.__doc__ = ObjectFactory.set_default_object_marking_refs.__doc__
def add_filters(self, *args, **kwargs):
return self.source.filters.update(*args, **kwargs)

View File

@ -34,14 +34,18 @@ RELATIONSHIP_IDS = [
'relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70'
]
# All required args for a Campaign instance
# *_KWARGS contains all required arguments to create an instance of that STIX object
# *_MORE_KWARGS contains all the required arguments, plus some optional ones
ATTACK_PATTERN_KWARGS = dict(
name="Phishing",
)
CAMPAIGN_KWARGS = dict(
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector.",
)
# All required args for a Campaign instance, plus some optional args
CAMPAIGN_MORE_KWARGS = dict(
type='campaign',
id=CAMPAIGN_ID,
@ -52,25 +56,29 @@ CAMPAIGN_MORE_KWARGS = dict(
description="Campaign by Green Group against a series of targets in the financial services sector.",
)
# Minimum required args for an Identity instance
COURSE_OF_ACTION_KWARGS = dict(
name="Block",
)
IDENTITY_KWARGS = dict(
name="John Smith",
identity_class="individual",
)
# Minimum required args for an Indicator instance
INDICATOR_KWARGS = dict(
labels=['malicious-activity'],
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
)
# Minimum required args for a Malware instance
INTRUSION_SET_KWARGS = dict(
name="Bobcat Breakin",
)
MALWARE_KWARGS = dict(
labels=['ransomware'],
name="Cryptolocker",
)
# All required args for a Malware instance, plus some optional args
MALWARE_MORE_KWARGS = dict(
type='malware',
id=MALWARE_ID,
@ -81,14 +89,45 @@ MALWARE_MORE_KWARGS = dict(
description="A ransomware related to ..."
)
# Minimum required args for a Relationship instance
OBSERVED_DATA_KWARGS = dict(
first_observed=FAKE_TIME,
last_observed=FAKE_TIME,
number_observed=1,
objects={
"0": {
"type": "windows-registry-key",
"key": "HKEY_LOCAL_MACHINE\\System\\Foo\\Bar",
}
}
)
REPORT_KWARGS = dict(
labels=["campaign"],
name="Bad Cybercrime",
published=FAKE_TIME,
object_refs=[INDICATOR_ID],
)
RELATIONSHIP_KWARGS = dict(
relationship_type="indicates",
source_ref=INDICATOR_ID,
target_ref=MALWARE_ID,
)
# Minimum required args for a Sighting instance
SIGHTING_KWARGS = dict(
sighting_of_ref=INDICATOR_ID,
)
THREAT_ACTOR_KWARGS = dict(
labels=["crime-syndicate"],
name="Evil Org",
)
TOOL_KWARGS = dict(
labels=["remote-access"],
name="VNC",
)
VULNERABILITY_KWARGS = dict(
name="Heartbleed",
)

View File

@ -4,7 +4,7 @@ from taxii2client import Collection
from stix2 import Filter, MemorySink, MemorySource
from stix2.datastore import (CompositeDataSource, DataSink, DataSource,
make_id, taxii)
from stix2.datastore.filters import apply_common_filters
from stix2.datastore.filters import _assemble_filters, apply_common_filters
from stix2.utils import deduplicate
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -473,6 +473,15 @@ def test_filters7():
assert len(resp) == 1
def test_assemble_filters():
filter1 = Filter("name", "=", "Malicious site hosting downloader")
filter2 = Filter("modified", ">", "2017-01-28T13:49:53.935Z")
result = _assemble_filters(filter1, filter2)
assert len(result) == 2
assert result[0].property == 'name'
assert result[1].property == 'modified'
def test_deduplicate():
unique = deduplicate(STIX_OBJS1)

View File

@ -47,7 +47,7 @@ def test_object_factory_created():
assert ind.modified == FAKE_TIME
def test_object_factory_external_resource():
def test_object_factory_external_reference():
ext_ref = stix2.ExternalReference(source_name="ACME Threat Intel",
description="Threat report")
factory = stix2.ObjectFactory(external_references=ext_ref)

View File

@ -58,4 +58,10 @@ def test_parse_tool(data):
assert tool.labels == ["remote-access"]
assert tool.name == "VNC"
def test_tool_no_workbench_wrappers():
tool = stix2.Tool(name='VNC', labels=['remote-access'])
with pytest.raises(AttributeError):
tool.created_by()
# TODO: Add other examples

View File

@ -0,0 +1,262 @@
import os
import stix2
from stix2.workbench import (AttackPattern, Campaign, CourseOfAction,
ExternalReference, FileSystemSource, Filter,
Identity, Indicator, IntrusionSet, Malware,
MarkingDefinition, ObservedData, Relationship,
Report, StatementMarking, ThreatActor, Tool,
Vulnerability, add_data_source, all_versions,
attack_patterns, campaigns, courses_of_action,
create, get, identities, indicators,
intrusion_sets, malware, observed_data, query,
reports, save, set_default_created,
set_default_creator, set_default_external_refs,
set_default_object_marking_refs, threat_actors,
tools, vulnerabilities)
from .constants import (ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID,
CAMPAIGN_KWARGS, COURSE_OF_ACTION_ID,
COURSE_OF_ACTION_KWARGS, IDENTITY_ID, IDENTITY_KWARGS,
INDICATOR_ID, INDICATOR_KWARGS, INTRUSION_SET_ID,
INTRUSION_SET_KWARGS, MALWARE_ID, MALWARE_KWARGS,
OBSERVED_DATA_ID, OBSERVED_DATA_KWARGS, REPORT_ID,
REPORT_KWARGS, THREAT_ACTOR_ID, THREAT_ACTOR_KWARGS,
TOOL_ID, TOOL_KWARGS, VULNERABILITY_ID,
VULNERABILITY_KWARGS)
def test_workbench_environment():
# Create a STIX object
ind = create(Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS)
save(ind)
resp = get(INDICATOR_ID)
assert resp['labels'][0] == 'malicious-activity'
resp = all_versions(INDICATOR_ID)
assert len(resp) == 1
# Search on something other than id
q = [Filter('type', '=', 'vulnerability')]
resp = query(q)
assert len(resp) == 0
def test_workbench_get_all_attack_patterns():
mal = AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
save(mal)
resp = attack_patterns()
assert len(resp) == 1
assert resp[0].id == ATTACK_PATTERN_ID
def test_workbench_get_all_campaigns():
cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
save(cam)
resp = campaigns()
assert len(resp) == 1
assert resp[0].id == CAMPAIGN_ID
def test_workbench_get_all_courses_of_action():
coa = CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS)
save(coa)
resp = courses_of_action()
assert len(resp) == 1
assert resp[0].id == COURSE_OF_ACTION_ID
def test_workbench_get_all_identities():
idty = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
save(idty)
resp = identities()
assert len(resp) == 1
assert resp[0].id == IDENTITY_ID
def test_workbench_get_all_indicators():
resp = indicators()
assert len(resp) == 1
assert resp[0].id == INDICATOR_ID
def test_workbench_get_all_intrusion_sets():
ins = IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS)
save(ins)
resp = intrusion_sets()
assert len(resp) == 1
assert resp[0].id == INTRUSION_SET_ID
def test_workbench_get_all_malware():
mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
save(mal)
resp = malware()
assert len(resp) == 1
assert resp[0].id == MALWARE_ID
def test_workbench_get_all_observed_data():
od = ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS)
save(od)
resp = observed_data()
assert len(resp) == 1
assert resp[0].id == OBSERVED_DATA_ID
def test_workbench_get_all_reports():
rep = Report(id=REPORT_ID, **REPORT_KWARGS)
save(rep)
resp = reports()
assert len(resp) == 1
assert resp[0].id == REPORT_ID
def test_workbench_get_all_threat_actors():
thr = ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
save(thr)
resp = threat_actors()
assert len(resp) == 1
assert resp[0].id == THREAT_ACTOR_ID
def test_workbench_get_all_tools():
tool = Tool(id=TOOL_ID, **TOOL_KWARGS)
save(tool)
resp = tools()
assert len(resp) == 1
assert resp[0].id == TOOL_ID
def test_workbench_get_all_vulnerabilities():
vuln = Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
save(vuln)
resp = vulnerabilities()
assert len(resp) == 1
assert resp[0].id == VULNERABILITY_ID
def test_workbench_relationships():
rel = Relationship(INDICATOR_ID, 'indicates', MALWARE_ID)
save(rel)
ind = get(INDICATOR_ID)
resp = ind.relationships()
assert len(resp) == 1
assert resp[0].relationship_type == 'indicates'
assert resp[0].source_ref == INDICATOR_ID
assert resp[0].target_ref == MALWARE_ID
def test_workbench_created_by():
intset = IntrusionSet(name="Breach 123", created_by_ref=IDENTITY_ID)
save(intset)
creator = intset.created_by()
assert creator.id == IDENTITY_ID
def test_workbench_related():
rel1 = Relationship(MALWARE_ID, 'targets', IDENTITY_ID)
rel2 = Relationship(CAMPAIGN_ID, 'uses', MALWARE_ID)
save([rel1, rel2])
resp = get(MALWARE_ID).related()
assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp)
resp = get(MALWARE_ID).related(relationship_type='indicates')
assert len(resp) == 1
def test_workbench_related_with_filters():
malware = Malware(labels=["ransomware"], name="CryptorBit", created_by_ref=IDENTITY_ID)
rel = Relationship(malware.id, 'variant-of', MALWARE_ID)
save([malware, rel])
filters = [Filter('created_by_ref', '=', IDENTITY_ID)]
resp = get(MALWARE_ID).related(filters=filters)
assert len(resp) == 1
assert resp[0].name == malware.name
assert resp[0].created_by_ref == IDENTITY_ID
# filters arg can also be single filter
resp = get(MALWARE_ID).related(filters=filters[0])
assert len(resp) == 1
def test_add_data_source():
fs_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
fs = FileSystemSource(fs_path)
add_data_source(fs)
resp = tools()
assert len(resp) == 3
resp_ids = [tool.id for tool in resp]
assert TOOL_ID in resp_ids
assert 'tool--03342581-f790-4f03-ba41-e82e67392e23' in resp_ids
assert 'tool--242f3da3-4425-4d11-8f5c-b842886da966' in resp_ids
def test_additional_filter():
resp = tools(Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'))
assert len(resp) == 2
def test_additional_filters_list():
resp = tools([Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'),
Filter('name', '=', 'Windows Credential Editor')])
assert len(resp) == 1
def test_default_creator():
set_default_creator(IDENTITY_ID)
campaign = Campaign(**CAMPAIGN_KWARGS)
assert 'created_by_ref' not in CAMPAIGN_KWARGS
assert campaign.created_by_ref == IDENTITY_ID
def test_default_created_timestamp():
timestamp = "2018-03-19T01:02:03.000Z"
set_default_created(timestamp)
campaign = Campaign(**CAMPAIGN_KWARGS)
assert 'created' not in CAMPAIGN_KWARGS
assert stix2.utils.format_datetime(campaign.created) == timestamp
assert stix2.utils.format_datetime(campaign.modified) == timestamp
def test_default_external_refs():
ext_ref = ExternalReference(source_name="ACME Threat Intel",
description="Threat report")
set_default_external_refs(ext_ref)
campaign = Campaign(**CAMPAIGN_KWARGS)
assert campaign.external_references[0].source_name == "ACME Threat Intel"
assert campaign.external_references[0].description == "Threat report"
def test_default_object_marking_refs():
stmt_marking = StatementMarking("Copyright 2016, Example Corp")
mark_def = MarkingDefinition(definition_type="statement",
definition=stmt_marking)
set_default_object_marking_refs(mark_def)
campaign = Campaign(**CAMPAIGN_KWARGS)
assert campaign.object_marking_refs[0] == mark_def.id

292
stix2/workbench.py Normal file
View File

@ -0,0 +1,292 @@
"""Functions and class wrappers for interacting with STIX data at a high level.
.. autofunction:: create
.. autofunction:: set_default_creator
.. autofunction:: set_default_created
.. autofunction:: set_default_external_refs
.. autofunction:: set_default_object_marking_refs
.. autofunction:: get
.. autofunction:: all_versions
.. autofunction:: query
.. autofunction:: creator_of
.. autofunction:: relationships
.. autofunction:: related_to
.. autofunction:: save
.. autofunction:: add_filters
.. autofunction:: add_filter
.. autofunction:: parse
.. autofunction:: add_data_source
.. autofunction:: add_data_sources
"""
import stix2
from . import AttackPattern as _AttackPattern
from . import Campaign as _Campaign
from . import CourseOfAction as _CourseOfAction
from . import Identity as _Identity
from . import Indicator as _Indicator
from . import IntrusionSet as _IntrusionSet
from . import Malware as _Malware
from . import ObservedData as _ObservedData
from . import Report as _Report
from . import ThreatActor as _ThreatActor
from . import Tool as _Tool
from . import Vulnerability as _Vulnerability
from . import (AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, # noqa: F401
Bundle, CustomExtension, CustomMarking, CustomObservable,
Directory, DomainName, EmailAddress, EmailMessage,
EmailMIMEComponent, Environment, ExtensionsProperty,
ExternalReference, File, FileSystemSource, Filter,
GranularMarking, HTTPRequestExt, ICMPExt, IPv4Address,
IPv6Address, KillChainPhase, MACAddress, MarkingDefinition,
MemoryStore, Mutex, NetworkTraffic, NTFSExt, parse_observable,
PDFExt, Process, RasterImageExt, Relationship, Sighting,
SocketExt, Software, StatementMarking, TAXIICollectionSource,
TCPExt, TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, TLPMarking,
UNIXAccountExt, URL, UserAccount, WindowsPEBinaryExt,
WindowsPEOptionalHeaderType, WindowsPESection,
WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType,
WindowsServiceExt, X509Certificate, X509V3ExtenstionsType)
from .datastore.filters import _assemble_filters
# Use an implicit MemoryStore
_environ = Environment(store=MemoryStore())
create = _environ.create
set_default_creator = _environ.set_default_creator
set_default_created = _environ.set_default_created
set_default_external_refs = _environ.set_default_external_refs
set_default_object_marking_refs = _environ.set_default_object_marking_refs
get = _environ.get
all_versions = _environ.all_versions
query = _environ.query
creator_of = _environ.creator_of
relationships = _environ.relationships
related_to = _environ.related_to
save = _environ.add
add_filters = _environ.add_filters
add_filter = _environ.add_filter
parse = _environ.parse
add_data_source = _environ.source.add_data_source
add_data_sources = _environ.source.add_data_sources
# Wrap SDOs with helper functions
STIX_OBJS = [_AttackPattern, _Campaign, _CourseOfAction, _Identity,
_Indicator, _IntrusionSet, _Malware, _ObservedData, _Report,
_ThreatActor, _Tool, _Vulnerability]
STIX_OBJ_DOCS = """
.. method:: created_by(*args, **kwargs)
{}
.. method:: relationships(*args, **kwargs)
{}
.. method:: related(*args, **kwargs)
{}
""".format(_environ.creator_of.__doc__,
_environ.relationships.__doc__,
_environ.related_to.__doc__)
def _created_by_wrapper(self, *args, **kwargs):
return _environ.creator_of(self, *args, **kwargs)
def _relationships_wrapper(self, *args, **kwargs):
return _environ.relationships(self, *args, **kwargs)
def _related_wrapper(self, *args, **kwargs):
return _environ.related_to(self, *args, **kwargs)
def _constructor_wrapper(obj_type):
# Use an intermediate wrapper class so the implicit environment will create objects that have our wrapper functions
wrapped_type = type(obj_type.__name__, obj_type.__bases__, dict(
created_by=_created_by_wrapper,
relationships=_relationships_wrapper,
related=_related_wrapper,
**obj_type.__dict__
))
@staticmethod
def new_constructor(cls, *args, **kwargs):
x = _environ.create(wrapped_type, *args, **kwargs)
return x
return new_constructor
def _setup_workbench():
# Create wrapper classes whose constructors call the implicit environment's create()
for obj_type in STIX_OBJS:
new_class_dict = {
'__new__': _constructor_wrapper(obj_type),
'__doc__': 'Workbench wrapper around the `{0} <stix2.v20.sdo.html#stix2.v20.sdo.{0}>`__ object. {1}'.format(obj_type.__name__, STIX_OBJ_DOCS)
}
new_class = type(obj_type.__name__, (), new_class_dict)
# Add our new class to this module's globals and to the library-wide mapping.
# This allows parse() to use the wrapped classes.
globals()[obj_type.__name__] = new_class
stix2.OBJ_MAP[obj_type._type] = new_class
new_class = None
_setup_workbench()
# Functions to get all objects of a specific type
def attack_patterns(filters=None):
"""Retrieve all Attack Pattern objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'attack-pattern')])
return query(filter_list)
def campaigns(filters=None):
"""Retrieve all Campaign objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'campaign')])
return query(filter_list)
def courses_of_action(filters=None):
"""Retrieve all Course of Action objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'course-of-action')])
return query(filter_list)
def identities(filters=None):
"""Retrieve all Identity objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'identity')])
return query(filter_list)
def indicators(filters=None):
"""Retrieve all Indicator objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'indicator')])
return query(filter_list)
def intrusion_sets(filters=None):
"""Retrieve all Intrusion Set objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'intrusion-set')])
return query(filter_list)
def malware(filters=None):
"""Retrieve all Malware objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'malware')])
return query(filter_list)
def observed_data(filters=None):
"""Retrieve all Observed Data objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'observed-data')])
return query(filter_list)
def reports(filters=None):
"""Retrieve all Report objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'report')])
return query(filter_list)
def threat_actors(filters=None):
"""Retrieve all Threat Actor objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'threat-actor')])
return query(filter_list)
def tools(filters=None):
"""Retrieve all Tool objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'tool')])
return query(filter_list)
def vulnerabilities(filters=None):
"""Retrieve all Vulnerability objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = _assemble_filters(filters, [Filter('type', '=', 'vulnerability')])
return query(filter_list)

View File

@ -10,7 +10,8 @@ deps =
coverage
taxii2-client
commands =
py.test --cov=stix2 stix2/test/ --cov-report term-missing
py.test --ignore=stix2/test/test_workbench.py --cov=stix2 stix2/test/ --cov-report term-missing
py.test stix2/test/test_workbench.py --cov=stix2 --cov-report term-missing --cov-append
passenv = CI TRAVIS TRAVIS_*