Merge branch 'master' into taxii_collection_source_404
commit
126094106e
|
@ -3,7 +3,6 @@ language: python
|
||||||
cache: pip
|
cache: pip
|
||||||
python:
|
python:
|
||||||
- "2.7"
|
- "2.7"
|
||||||
- "3.3"
|
|
||||||
- "3.4"
|
- "3.4"
|
||||||
- "3.5"
|
- "3.5"
|
||||||
- "3.5-dev"
|
- "3.5-dev"
|
||||||
|
@ -16,6 +15,6 @@ install:
|
||||||
- pip install codecov
|
- pip install codecov
|
||||||
script:
|
script:
|
||||||
- tox
|
- tox
|
||||||
- if [[ $TRAVIS_PYTHON_VERSION != 2.6 ]]; then pre-commit run --all-files; fi
|
- pre-commit run --all-files
|
||||||
after_success:
|
after_success:
|
||||||
- codecov
|
- codecov
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"execution_count": 2,
|
"execution_count": 40,
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"collapsed": true,
|
"collapsed": true,
|
||||||
"nbsphinx": "hidden"
|
"nbsphinx": "hidden"
|
||||||
|
@ -454,6 +454,277 @@
|
||||||
"# attach multiple filters to a MemoryStore\n",
|
"# attach multiple filters to a MemoryStore\n",
|
||||||
"mem.source.filters.update([f1,f2])"
|
"mem.source.filters.update([f1,f2])"
|
||||||
]
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"## De-Referencing Relationships\n",
|
||||||
|
"\n",
|
||||||
|
"Given a STIX object, there are several ways to find other STIX objects related to it. To illustrate this, let's first create a [DataStore](../api/stix2.sources.rst#stix2.sources.DataStore) and add some objects and relationships."
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 13,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"from stix2 import Campaign, Identity, Indicator, Malware, Relationship\n",
|
||||||
|
"\n",
|
||||||
|
"mem = MemoryStore()\n",
|
||||||
|
"cam = Campaign(name='Charge', description='Attack!')\n",
|
||||||
|
"idy = Identity(name='John Doe', identity_class=\"individual\")\n",
|
||||||
|
"ind = Indicator(labels=['malicious-activity'], pattern=\"[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']\")\n",
|
||||||
|
"mal = Malware(labels=['ransomware'], name=\"Cryptolocker\", created_by_ref=idy)\n",
|
||||||
|
"rel1 = Relationship(ind, 'indicates', mal,)\n",
|
||||||
|
"rel2 = Relationship(mal, 'targets', idy)\n",
|
||||||
|
"rel3 = Relationship(cam, 'uses', mal)\n",
|
||||||
|
"mem.add([cam, idy, ind, mal, rel1, rel2, rel3])"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"If a STIX object has a `created_by_ref` property, you can use the [creator_of()](../api/stix2.sources.rst#stix2.sources.DataSource.creator_of) method to retrieve the [Identity](../api/stix2.v20.sdo.rst#stix2.v20.sdo.Identity) object that created it."
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 14,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/html": [
|
||||||
|
"<style type=\"text/css\">.highlight .hll { background-color: #ffffcc }\n",
|
||||||
|
".highlight { background: #f8f8f8; }\n",
|
||||||
|
".highlight .c { color: #408080; font-style: italic } /* Comment */\n",
|
||||||
|
".highlight .err { border: 1px solid #FF0000 } /* Error */\n",
|
||||||
|
".highlight .k { color: #008000; font-weight: bold } /* Keyword */\n",
|
||||||
|
".highlight .o { color: #666666 } /* Operator */\n",
|
||||||
|
".highlight .ch { color: #408080; font-style: italic } /* Comment.Hashbang */\n",
|
||||||
|
".highlight .cm { color: #408080; font-style: italic } /* Comment.Multiline */\n",
|
||||||
|
".highlight .cp { color: #BC7A00 } /* Comment.Preproc */\n",
|
||||||
|
".highlight .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */\n",
|
||||||
|
".highlight .c1 { color: #408080; font-style: italic } /* Comment.Single */\n",
|
||||||
|
".highlight .cs { color: #408080; font-style: italic } /* Comment.Special */\n",
|
||||||
|
".highlight .gd { color: #A00000 } /* Generic.Deleted */\n",
|
||||||
|
".highlight .ge { font-style: italic } /* Generic.Emph */\n",
|
||||||
|
".highlight .gr { color: #FF0000 } /* Generic.Error */\n",
|
||||||
|
".highlight .gh { color: #000080; font-weight: bold } /* Generic.Heading */\n",
|
||||||
|
".highlight .gi { color: #00A000 } /* Generic.Inserted */\n",
|
||||||
|
".highlight .go { color: #888888 } /* Generic.Output */\n",
|
||||||
|
".highlight .gp { color: #000080; font-weight: bold } /* Generic.Prompt */\n",
|
||||||
|
".highlight .gs { font-weight: bold } /* Generic.Strong */\n",
|
||||||
|
".highlight .gu { color: #800080; font-weight: bold } /* Generic.Subheading */\n",
|
||||||
|
".highlight .gt { color: #0044DD } /* Generic.Traceback */\n",
|
||||||
|
".highlight .kc { color: #008000; font-weight: bold } /* Keyword.Constant */\n",
|
||||||
|
".highlight .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */\n",
|
||||||
|
".highlight .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */\n",
|
||||||
|
".highlight .kp { color: #008000 } /* Keyword.Pseudo */\n",
|
||||||
|
".highlight .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */\n",
|
||||||
|
".highlight .kt { color: #B00040 } /* Keyword.Type */\n",
|
||||||
|
".highlight .m { color: #666666 } /* Literal.Number */\n",
|
||||||
|
".highlight .s { color: #BA2121 } /* Literal.String */\n",
|
||||||
|
".highlight .na { color: #7D9029 } /* Name.Attribute */\n",
|
||||||
|
".highlight .nb { color: #008000 } /* Name.Builtin */\n",
|
||||||
|
".highlight .nc { color: #0000FF; font-weight: bold } /* Name.Class */\n",
|
||||||
|
".highlight .no { color: #880000 } /* Name.Constant */\n",
|
||||||
|
".highlight .nd { color: #AA22FF } /* Name.Decorator */\n",
|
||||||
|
".highlight .ni { color: #999999; font-weight: bold } /* Name.Entity */\n",
|
||||||
|
".highlight .ne { color: #D2413A; font-weight: bold } /* Name.Exception */\n",
|
||||||
|
".highlight .nf { color: #0000FF } /* Name.Function */\n",
|
||||||
|
".highlight .nl { color: #A0A000 } /* Name.Label */\n",
|
||||||
|
".highlight .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */\n",
|
||||||
|
".highlight .nt { color: #008000; font-weight: bold } /* Name.Tag */\n",
|
||||||
|
".highlight .nv { color: #19177C } /* Name.Variable */\n",
|
||||||
|
".highlight .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */\n",
|
||||||
|
".highlight .w { color: #bbbbbb } /* Text.Whitespace */\n",
|
||||||
|
".highlight .mb { color: #666666 } /* Literal.Number.Bin */\n",
|
||||||
|
".highlight .mf { color: #666666 } /* Literal.Number.Float */\n",
|
||||||
|
".highlight .mh { color: #666666 } /* Literal.Number.Hex */\n",
|
||||||
|
".highlight .mi { color: #666666 } /* Literal.Number.Integer */\n",
|
||||||
|
".highlight .mo { color: #666666 } /* Literal.Number.Oct */\n",
|
||||||
|
".highlight .sa { color: #BA2121 } /* Literal.String.Affix */\n",
|
||||||
|
".highlight .sb { color: #BA2121 } /* Literal.String.Backtick */\n",
|
||||||
|
".highlight .sc { color: #BA2121 } /* Literal.String.Char */\n",
|
||||||
|
".highlight .dl { color: #BA2121 } /* Literal.String.Delimiter */\n",
|
||||||
|
".highlight .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */\n",
|
||||||
|
".highlight .s2 { color: #BA2121 } /* Literal.String.Double */\n",
|
||||||
|
".highlight .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */\n",
|
||||||
|
".highlight .sh { color: #BA2121 } /* Literal.String.Heredoc */\n",
|
||||||
|
".highlight .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */\n",
|
||||||
|
".highlight .sx { color: #008000 } /* Literal.String.Other */\n",
|
||||||
|
".highlight .sr { color: #BB6688 } /* Literal.String.Regex */\n",
|
||||||
|
".highlight .s1 { color: #BA2121 } /* Literal.String.Single */\n",
|
||||||
|
".highlight .ss { color: #19177C } /* Literal.String.Symbol */\n",
|
||||||
|
".highlight .bp { color: #008000 } /* Name.Builtin.Pseudo */\n",
|
||||||
|
".highlight .fm { color: #0000FF } /* Name.Function.Magic */\n",
|
||||||
|
".highlight .vc { color: #19177C } /* Name.Variable.Class */\n",
|
||||||
|
".highlight .vg { color: #19177C } /* Name.Variable.Global */\n",
|
||||||
|
".highlight .vi { color: #19177C } /* Name.Variable.Instance */\n",
|
||||||
|
".highlight .vm { color: #19177C } /* Name.Variable.Magic */\n",
|
||||||
|
".highlight .il { color: #666666 } /* Literal.Number.Integer.Long */</style><div class=\"highlight\"><pre><span></span><span class=\"p\">{</span>\n",
|
||||||
|
" <span class=\"nt\">"type"</span><span class=\"p\">:</span> <span class=\"s2\">"identity"</span><span class=\"p\">,</span>\n",
|
||||||
|
" <span class=\"nt\">"id"</span><span class=\"p\">:</span> <span class=\"s2\">"identity--be3baac0-9aba-48a8-81e4-4408b1c379a8"</span><span class=\"p\">,</span>\n",
|
||||||
|
" <span class=\"nt\">"created"</span><span class=\"p\">:</span> <span class=\"s2\">"2017-11-21T22:14:45.213Z"</span><span class=\"p\">,</span>\n",
|
||||||
|
" <span class=\"nt\">"modified"</span><span class=\"p\">:</span> <span class=\"s2\">"2017-11-21T22:14:45.213Z"</span><span class=\"p\">,</span>\n",
|
||||||
|
" <span class=\"nt\">"name"</span><span class=\"p\">:</span> <span class=\"s2\">"John Doe"</span><span class=\"p\">,</span>\n",
|
||||||
|
" <span class=\"nt\">"identity_class"</span><span class=\"p\">:</span> <span class=\"s2\">"individual"</span>\n",
|
||||||
|
"<span class=\"p\">}</span>\n",
|
||||||
|
"</pre></div>\n"
|
||||||
|
],
|
||||||
|
"text/plain": [
|
||||||
|
"<IPython.core.display.HTML object>"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 14,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"print(mem.creator_of(mal))"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"Use the [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) method to retrieve all the relationship objects that reference a STIX object."
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 15,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/plain": [
|
||||||
|
"3"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 15,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"rels = mem.relationships(mal)\n",
|
||||||
|
"len(rels)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"You can limit it to only specific relationship types:"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 27,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/plain": [
|
||||||
|
"[Relationship(type='relationship', id='relationship--bd6fd399-c907-4feb-b1da-b90f15942f1d', created='2017-11-21T22:14:45.214Z', modified='2017-11-21T22:14:45.214Z', relationship_type=u'indicates', source_ref='indicator--5ee33ff0-c50d-456b-a8dd-b5d1b69a66e8', target_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4')]"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 27,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"mem.relationships(mal, relationship_type='indicates')"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"You can limit it to only relationships where the given object is the source:"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 28,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/plain": [
|
||||||
|
"[Relationship(type='relationship', id='relationship--7eb7f5cd-8bf2-4f7c-8756-84c0b5693b9a', created='2017-11-21T22:14:45.215Z', modified='2017-11-21T22:14:45.215Z', relationship_type=u'targets', source_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4', target_ref='identity--be3baac0-9aba-48a8-81e4-4408b1c379a8')]"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 28,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"mem.relationships(mal, source_only=True)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"And you can limit it to only relationships where the given object is the target:"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 30,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/plain": [
|
||||||
|
"[Relationship(type='relationship', id='relationship--bd6fd399-c907-4feb-b1da-b90f15942f1d', created='2017-11-21T22:14:45.214Z', modified='2017-11-21T22:14:45.214Z', relationship_type=u'indicates', source_ref='indicator--5ee33ff0-c50d-456b-a8dd-b5d1b69a66e8', target_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4'),\n",
|
||||||
|
" Relationship(type='relationship', id='relationship--3c759d40-c92a-430e-aab6-77d5c5763302', created='2017-11-21T22:14:45.215Z', modified='2017-11-21T22:14:45.215Z', relationship_type=u'uses', source_ref='campaign--82ab7aa4-d13b-4e99-8a09-ebcba30668a7', target_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4')]"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 30,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"mem.relationships(mal, target_only=True)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"Finally, you can retrieve all STIX objects related to a given STIX object using [related_to()](../api/stix2.sources.rst#stix2.sources.DataSource.related_to). This calls [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) but then performs the extra step of getting the objects that these Relationships point to. [related_to()](../api/stix2.sources.rst#stix2.sources.DataSource.related_to) takes all the same arguments that [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) does."
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 42,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/plain": [
|
||||||
|
"[Campaign(type='campaign', id='campaign--82ab7aa4-d13b-4e99-8a09-ebcba30668a7', created='2017-11-21T22:14:45.213Z', modified='2017-11-21T22:14:45.213Z', name=u'Charge', description=u'Attack!')]"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 42,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"mem.related_to(mal, target_only=True, relationship_type='uses')"
|
||||||
|
]
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"metadata": {
|
"metadata": {
|
||||||
|
|
|
@ -128,7 +128,7 @@
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"You can retrieve STIX objects from the [DataSources](../api/stix2.sources.rst#stix2.sources.DataSource) in the [Environment](../api/stix2.environment.rst#stix2.environment.Environment) with [get()](../api/stix2.environment.rst#stix2.environment.Environment.get), [query()](../api/stix2.environment.rst#stix2.environment.Environment.query), and [all_versions()](../api/stix2.environment.rst#stix2.environment.Environment.all_versions), just as you would for a [DataSource](../api/stix2.sources.rst#stix2.sources.DataSource)."
|
"You can retrieve STIX objects from the [DataSources](../api/stix2.sources.rst#stix2.sources.DataSource) in the [Environment](../api/stix2.environment.rst#stix2.environment.Environment) with [get()](../api/stix2.environment.rst#stix2.environment.Environment.get), [query()](../api/stix2.environment.rst#stix2.environment.Environment.query), [all_versions()](../api/stix2.environment.rst#stix2.environment.Environment.all_versions), [creator_of()](../api/stix2.sources.rst#stix2.sources.DataSource.creator_of), [related_to()](../api/stix2.sources.rst#stix2.sources.DataSource.related_to), and [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) just as you would for a [DataSource](../api/stix2.sources.rst#stix2.sources.DataSource)."
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
1
setup.py
1
setup.py
|
@ -39,7 +39,6 @@ setup(
|
||||||
'Programming Language :: Python :: 2',
|
'Programming Language :: Python :: 2',
|
||||||
'Programming Language :: Python :: 2.7',
|
'Programming Language :: Python :: 2.7',
|
||||||
'Programming Language :: Python :: 3',
|
'Programming Language :: Python :: 3',
|
||||||
'Programming Language :: Python :: 3.3',
|
|
||||||
'Programming Language :: Python :: 3.4',
|
'Programming Language :: Python :: 3.4',
|
||||||
'Programming Language :: Python :: 3.5',
|
'Programming Language :: Python :: 3.5',
|
||||||
'Programming Language :: Python :: 3.6',
|
'Programming Language :: Python :: 3.6',
|
||||||
|
|
|
@ -105,30 +105,13 @@ class Environment(object):
|
||||||
return self.factory.create(*args, **kwargs)
|
return self.factory.create(*args, **kwargs)
|
||||||
create.__doc__ = ObjectFactory.create.__doc__
|
create.__doc__ = ObjectFactory.create.__doc__
|
||||||
|
|
||||||
def get(self, *args, **kwargs):
|
get = DataStore.__dict__['get']
|
||||||
try:
|
all_versions = DataStore.__dict__['all_versions']
|
||||||
return self.source.get(*args, **kwargs)
|
query = DataStore.__dict__['query']
|
||||||
except AttributeError:
|
creator_of = DataStore.__dict__['creator_of']
|
||||||
raise AttributeError('Environment has no data source to query')
|
relationships = DataStore.__dict__['relationships']
|
||||||
get.__doc__ = DataStore.get.__doc__
|
related_to = DataStore.__dict__['related_to']
|
||||||
|
add = DataStore.__dict__['add']
|
||||||
def all_versions(self, *args, **kwargs):
|
|
||||||
"""Retrieve all versions of a single STIX object by ID.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
return self.source.all_versions(*args, **kwargs)
|
|
||||||
except AttributeError:
|
|
||||||
raise AttributeError('Environment has no data source to query')
|
|
||||||
all_versions.__doc__ = DataStore.all_versions.__doc__
|
|
||||||
|
|
||||||
def query(self, *args, **kwargs):
|
|
||||||
"""Retrieve STIX objects matching a set of filters.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
return self.source.query(*args, **kwargs)
|
|
||||||
except AttributeError:
|
|
||||||
raise AttributeError('Environment has no data source to query')
|
|
||||||
query.__doc__ = DataStore.query.__doc__
|
|
||||||
|
|
||||||
def add_filters(self, *args, **kwargs):
|
def add_filters(self, *args, **kwargs):
|
||||||
try:
|
try:
|
||||||
|
@ -142,31 +125,6 @@ class Environment(object):
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
raise AttributeError('Environment has no data source')
|
raise AttributeError('Environment has no data source')
|
||||||
|
|
||||||
def add(self, *args, **kwargs):
|
|
||||||
try:
|
|
||||||
return self.sink.add(*args, **kwargs)
|
|
||||||
except AttributeError:
|
|
||||||
raise AttributeError('Environment has no data sink to put objects in')
|
|
||||||
add.__doc__ = DataStore.add.__doc__
|
|
||||||
|
|
||||||
def parse(self, *args, **kwargs):
|
def parse(self, *args, **kwargs):
|
||||||
return _parse(*args, **kwargs)
|
return _parse(*args, **kwargs)
|
||||||
parse.__doc__ = _parse.__doc__
|
parse.__doc__ = _parse.__doc__
|
||||||
|
|
||||||
def creator_of(self, obj):
|
|
||||||
"""Retrieve the Identity refered to by the object's `created_by_ref`.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
obj: The STIX object whose `created_by_ref` property will be looked
|
|
||||||
up.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The STIX object's creator, or None, if the object contains no
|
|
||||||
`created_by_ref` property or the object's creator cannot be found.
|
|
||||||
|
|
||||||
"""
|
|
||||||
creator_id = obj.get('created_by_ref', '')
|
|
||||||
if creator_id:
|
|
||||||
return self.get(creator_id)
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ import uuid
|
||||||
|
|
||||||
from six import with_metaclass
|
from six import with_metaclass
|
||||||
|
|
||||||
|
from stix2.sources.filters import Filter
|
||||||
from stix2.utils import deduplicate
|
from stix2.utils import deduplicate
|
||||||
|
|
||||||
|
|
||||||
|
@ -58,7 +59,10 @@ class DataStore(object):
|
||||||
object specified by the "id".
|
object specified by the "id".
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.source.get(*args, **kwargs)
|
try:
|
||||||
|
return self.source.get(*args, **kwargs)
|
||||||
|
except AttributeError:
|
||||||
|
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
|
||||||
|
|
||||||
def all_versions(self, *args, **kwargs):
|
def all_versions(self, *args, **kwargs):
|
||||||
"""Retrieve all versions of a single STIX object by ID.
|
"""Retrieve all versions of a single STIX object by ID.
|
||||||
|
@ -72,7 +76,10 @@ class DataStore(object):
|
||||||
stix_objs (list): a list of STIX objects
|
stix_objs (list): a list of STIX objects
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.source.all_versions(*args, **kwargs)
|
try:
|
||||||
|
return self.source.all_versions(*args, **kwargs)
|
||||||
|
except AttributeError:
|
||||||
|
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
|
||||||
|
|
||||||
def query(self, *args, **kwargs):
|
def query(self, *args, **kwargs):
|
||||||
"""Retrieve STIX objects matching a set of filters.
|
"""Retrieve STIX objects matching a set of filters.
|
||||||
|
@ -87,7 +94,83 @@ class DataStore(object):
|
||||||
stix_objs (list): a list of STIX objects
|
stix_objs (list): a list of STIX objects
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.source.query(*args, **kwargs)
|
try:
|
||||||
|
return self.source.query(*args, **kwargs)
|
||||||
|
except AttributeError:
|
||||||
|
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
|
||||||
|
|
||||||
|
def creator_of(self, *args, **kwargs):
|
||||||
|
"""Retrieve the Identity refered to by the object's `created_by_ref`.
|
||||||
|
|
||||||
|
Translate creator_of() call to the appropriate DataSource call.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj: The STIX object whose `created_by_ref` property will be looked
|
||||||
|
up.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The STIX object's creator, or None, if the object contains no
|
||||||
|
`created_by_ref` property or the object's creator cannot be found.
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return self.source.creator_of(*args, **kwargs)
|
||||||
|
except AttributeError:
|
||||||
|
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
|
||||||
|
|
||||||
|
def relationships(self, *args, **kwargs):
|
||||||
|
"""Retrieve Relationships involving the given STIX object.
|
||||||
|
|
||||||
|
Translate relationships() call to the appropriate DataSource call.
|
||||||
|
|
||||||
|
Only one of `source_only` and `target_only` may be `True`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
|
||||||
|
relationships will be looked up.
|
||||||
|
relationship_type (str): Only retrieve Relationships of this type.
|
||||||
|
If None, all relationships will be returned, regardless of type.
|
||||||
|
source_only (bool): Only retrieve Relationships for which this
|
||||||
|
object is the source_ref. Default: False.
|
||||||
|
target_only (bool): Only retrieve Relationships for which this
|
||||||
|
object is the target_ref. Default: False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list): List of Relationship objects involving the given STIX object.
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return self.source.relationships(*args, **kwargs)
|
||||||
|
except AttributeError:
|
||||||
|
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
|
||||||
|
|
||||||
|
def related_to(self, *args, **kwargs):
|
||||||
|
"""Retrieve STIX Objects that have a Relationship involving the given
|
||||||
|
STIX object.
|
||||||
|
|
||||||
|
Translate related_to() call to the appropriate DataSource call.
|
||||||
|
|
||||||
|
Only one of `source_only` and `target_only` may be `True`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
|
||||||
|
related objects will be looked up.
|
||||||
|
relationship_type (str): Only retrieve objects related by this
|
||||||
|
Relationships type. If None, all related objects will be
|
||||||
|
returned, regardless of type.
|
||||||
|
source_only (bool): Only examine Relationships for which this
|
||||||
|
object is the source_ref. Default: False.
|
||||||
|
target_only (bool): Only examine Relationships for which this
|
||||||
|
object is the target_ref. Default: False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list): List of STIX objects related to the given STIX object.
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return self.source.related_to(*args, **kwargs)
|
||||||
|
except AttributeError:
|
||||||
|
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
|
||||||
|
|
||||||
def add(self, *args, **kwargs):
|
def add(self, *args, **kwargs):
|
||||||
"""Method for storing STIX objects.
|
"""Method for storing STIX objects.
|
||||||
|
@ -99,7 +182,10 @@ class DataStore(object):
|
||||||
stix_objs (list): a list of STIX objects
|
stix_objs (list): a list of STIX objects
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.sink.add(*args, **kwargs)
|
try:
|
||||||
|
return self.sink.add(*args, **kwargs)
|
||||||
|
except AttributeError:
|
||||||
|
raise AttributeError('%s has no data sink to put objects in' % self.__class__.__name__)
|
||||||
|
|
||||||
|
|
||||||
class DataSink(with_metaclass(ABCMeta)):
|
class DataSink(with_metaclass(ABCMeta)):
|
||||||
|
@ -191,6 +277,108 @@ class DataSource(with_metaclass(ABCMeta)):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def creator_of(self, obj):
|
||||||
|
"""Retrieve the Identity refered to by the object's `created_by_ref`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj: The STIX object whose `created_by_ref` property will be looked
|
||||||
|
up.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The STIX object's creator, or None, if the object contains no
|
||||||
|
`created_by_ref` property or the object's creator cannot be found.
|
||||||
|
|
||||||
|
"""
|
||||||
|
creator_id = obj.get('created_by_ref', '')
|
||||||
|
if creator_id:
|
||||||
|
return self.get(creator_id)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
|
||||||
|
"""Retrieve Relationships involving the given STIX object.
|
||||||
|
|
||||||
|
Only one of `source_only` and `target_only` may be `True`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
|
||||||
|
relationships will be looked up.
|
||||||
|
relationship_type (str): Only retrieve Relationships of this type.
|
||||||
|
If None, all relationships will be returned, regardless of type.
|
||||||
|
source_only (bool): Only retrieve Relationships for which this
|
||||||
|
object is the source_ref. Default: False.
|
||||||
|
target_only (bool): Only retrieve Relationships for which this
|
||||||
|
object is the target_ref. Default: False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list): List of Relationship objects involving the given STIX object.
|
||||||
|
|
||||||
|
"""
|
||||||
|
results = []
|
||||||
|
filters = [Filter('type', '=', 'relationship')]
|
||||||
|
|
||||||
|
try:
|
||||||
|
obj_id = obj['id']
|
||||||
|
except KeyError:
|
||||||
|
raise ValueError("STIX object has no 'id' property")
|
||||||
|
except TypeError:
|
||||||
|
# Assume `obj` is an ID string
|
||||||
|
obj_id = obj
|
||||||
|
|
||||||
|
if relationship_type:
|
||||||
|
filters.append(Filter('relationship_type', '=', relationship_type))
|
||||||
|
|
||||||
|
if source_only and target_only:
|
||||||
|
raise ValueError("Search either source only or target only, but not both")
|
||||||
|
|
||||||
|
if not target_only:
|
||||||
|
results.extend(self.query(filters + [Filter('source_ref', '=', obj_id)]))
|
||||||
|
if not source_only:
|
||||||
|
results.extend(self.query(filters + [Filter('target_ref', '=', obj_id)]))
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def related_to(self, obj, relationship_type=None, source_only=False, target_only=False):
|
||||||
|
"""Retrieve STIX Objects that have a Relationship involving the given
|
||||||
|
STIX object.
|
||||||
|
|
||||||
|
Only one of `source_only` and `target_only` may be `True`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
|
||||||
|
related objects will be looked up.
|
||||||
|
relationship_type (str): Only retrieve objects related by this
|
||||||
|
Relationships type. If None, all related objects will be
|
||||||
|
returned, regardless of type.
|
||||||
|
source_only (bool): Only examine Relationships for which this
|
||||||
|
object is the source_ref. Default: False.
|
||||||
|
target_only (bool): Only examine Relationships for which this
|
||||||
|
object is the target_ref. Default: False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list): List of STIX objects related to the given STIX object.
|
||||||
|
|
||||||
|
"""
|
||||||
|
results = []
|
||||||
|
rels = self.relationships(obj, relationship_type, source_only, target_only)
|
||||||
|
|
||||||
|
try:
|
||||||
|
obj_id = obj['id']
|
||||||
|
except TypeError:
|
||||||
|
# Assume `obj` is an ID string
|
||||||
|
obj_id = obj
|
||||||
|
|
||||||
|
# Get all unique ids from the relationships except that of the object
|
||||||
|
ids = set()
|
||||||
|
for r in rels:
|
||||||
|
ids.update((r.source_ref, r.target_ref))
|
||||||
|
ids.remove(obj_id)
|
||||||
|
|
||||||
|
for i in ids:
|
||||||
|
results.append(self.get(i))
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
class CompositeDataSource(DataSource):
|
class CompositeDataSource(DataSource):
|
||||||
"""Controller for all the attached DataSources.
|
"""Controller for all the attached DataSources.
|
||||||
|
@ -354,6 +542,80 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
return all_data
|
return all_data
|
||||||
|
|
||||||
|
def relationships(self, *args, **kwargs):
|
||||||
|
"""Retrieve Relationships involving the given STIX object.
|
||||||
|
|
||||||
|
Only one of `source_only` and `target_only` may be `True`.
|
||||||
|
|
||||||
|
Federated relationships retrieve method - iterates through all
|
||||||
|
DataSources defined in "data_sources".
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
|
||||||
|
relationships will be looked up.
|
||||||
|
relationship_type (str): Only retrieve Relationships of this type.
|
||||||
|
If None, all relationships will be returned, regardless of type.
|
||||||
|
source_only (bool): Only retrieve Relationships for which this
|
||||||
|
object is the source_ref. Default: False.
|
||||||
|
target_only (bool): Only retrieve Relationships for which this
|
||||||
|
object is the target_ref. Default: False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list): List of Relationship objects involving the given STIX object.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if not self.has_data_sources():
|
||||||
|
raise AttributeError('CompositeDataSource has no data sources')
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for ds in self.data_sources:
|
||||||
|
results.extend(ds.relationships(*args, **kwargs))
|
||||||
|
|
||||||
|
# remove exact duplicates (where duplicates are STIX 2.0
|
||||||
|
# objects with the same 'id' and 'modified' values)
|
||||||
|
if len(results) > 0:
|
||||||
|
results = deduplicate(results)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def related_to(self, *args, **kwargs):
|
||||||
|
"""Retrieve STIX Objects that have a Relationship involving the given
|
||||||
|
STIX object.
|
||||||
|
|
||||||
|
Only one of `source_only` and `target_only` may be `True`.
|
||||||
|
|
||||||
|
Federated related objects method - iterates through all
|
||||||
|
DataSources defined in "data_sources".
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
|
||||||
|
related objects will be looked up.
|
||||||
|
relationship_type (str): Only retrieve objects related by this
|
||||||
|
Relationships type. If None, all related objects will be
|
||||||
|
returned, regardless of type.
|
||||||
|
source_only (bool): Only examine Relationships for which this
|
||||||
|
object is the source_ref. Default: False.
|
||||||
|
target_only (bool): Only examine Relationships for which this
|
||||||
|
object is the target_ref. Default: False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list): List of STIX objects related to the given STIX object.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if not self.has_data_sources():
|
||||||
|
raise AttributeError('CompositeDataSource has no data sources')
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for ds in self.data_sources:
|
||||||
|
results.extend(ds.related_to(*args, **kwargs))
|
||||||
|
|
||||||
|
# remove exact duplicates (where duplicates are STIX 2.0
|
||||||
|
# objects with the same 'id' and 'modified' values)
|
||||||
|
if len(results) > 0:
|
||||||
|
results = deduplicate(results)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
def add_data_source(self, data_source):
|
def add_data_source(self, data_source):
|
||||||
"""Attach a DataSource to CompositeDataSource instance
|
"""Attach a DataSource to CompositeDataSource instance
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,11 @@ FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
|
||||||
|
|
||||||
"""Supported filter value types"""
|
"""Supported filter value types"""
|
||||||
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
|
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
|
||||||
|
try:
|
||||||
|
FILTER_VALUE_TYPES.append(unicode)
|
||||||
|
except NameError:
|
||||||
|
# Python 3 doesn't need to worry about unicode
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _check_filter_components(prop, op, value):
|
def _check_filter_components(prop, op, value):
|
||||||
|
|
|
@ -28,6 +28,18 @@ MARKING_IDS = [
|
||||||
"marking-definition--68520ae2-fefe-43a9-84ee-2c2a934d2c7d",
|
"marking-definition--68520ae2-fefe-43a9-84ee-2c2a934d2c7d",
|
||||||
"marking-definition--2802dfb1-1019-40a8-8848-68d0ec0e417f",
|
"marking-definition--2802dfb1-1019-40a8-8848-68d0ec0e417f",
|
||||||
]
|
]
|
||||||
|
RELATIONSHIP_IDS = [
|
||||||
|
'relationship--06520621-5352-4e6a-b976-e8fa3d437ffd',
|
||||||
|
'relationship--181c9c09-43e6-45dd-9374-3bec192f05ef',
|
||||||
|
'relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70'
|
||||||
|
]
|
||||||
|
|
||||||
|
# All required args for a Campaign instance
|
||||||
|
CAMPAIGN_KWARGS = dict(
|
||||||
|
name="Green Group Attacks Against Finance",
|
||||||
|
description="Campaign by Green Group against a series of targets in the financial services sector.",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# All required args for a Campaign instance, plus some optional args
|
# All required args for a Campaign instance, plus some optional args
|
||||||
CAMPAIGN_MORE_KWARGS = dict(
|
CAMPAIGN_MORE_KWARGS = dict(
|
||||||
|
|
|
@ -94,6 +94,28 @@ def test_custom_property_in_bundled_object():
|
||||||
assert '"x_foo": "bar"' in str(bundle)
|
assert '"x_foo": "bar"' in str(bundle)
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_marking_no_init_1():
|
||||||
|
@stix2.CustomMarking('x-new-obj', [
|
||||||
|
('property1', stix2.properties.StringProperty(required=True)),
|
||||||
|
])
|
||||||
|
class NewObj():
|
||||||
|
pass
|
||||||
|
|
||||||
|
no = NewObj(property1='something')
|
||||||
|
assert no.property1 == 'something'
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_marking_no_init_2():
|
||||||
|
@stix2.CustomMarking('x-new-obj2', [
|
||||||
|
('property1', stix2.properties.StringProperty(required=True)),
|
||||||
|
])
|
||||||
|
class NewObj2(object):
|
||||||
|
pass
|
||||||
|
|
||||||
|
no2 = NewObj2(property1='something')
|
||||||
|
assert no2.property1 == 'something'
|
||||||
|
|
||||||
|
|
||||||
@stix2.sdo.CustomObject('x-new-type', [
|
@stix2.sdo.CustomObject('x-new-type', [
|
||||||
('property1', stix2.properties.StringProperty(required=True)),
|
('property1', stix2.properties.StringProperty(required=True)),
|
||||||
('property2', stix2.properties.IntegerProperty()),
|
('property2', stix2.properties.IntegerProperty()),
|
||||||
|
@ -102,6 +124,15 @@ class NewType(object):
|
||||||
def __init__(self, property2=None, **kwargs):
|
def __init__(self, property2=None, **kwargs):
|
||||||
if property2 and property2 < 10:
|
if property2 and property2 < 10:
|
||||||
raise ValueError("'property2' is too small.")
|
raise ValueError("'property2' is too small.")
|
||||||
|
if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
|
||||||
|
raise TypeError("Must be integer!")
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_object_raises_exception():
|
||||||
|
with pytest.raises(TypeError) as excinfo:
|
||||||
|
NewType(property1='something', property3='something', allow_custom=True)
|
||||||
|
|
||||||
|
assert str(excinfo.value) == "Must be integer!"
|
||||||
|
|
||||||
|
|
||||||
def test_custom_object_type():
|
def test_custom_object_type():
|
||||||
|
@ -117,7 +148,7 @@ def test_custom_object_type():
|
||||||
assert "'property2' is too small." in str(excinfo.value)
|
assert "'property2' is too small." in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
def test_custom_object_no_init():
|
def test_custom_object_no_init_1():
|
||||||
@stix2.sdo.CustomObject('x-new-obj', [
|
@stix2.sdo.CustomObject('x-new-obj', [
|
||||||
('property1', stix2.properties.StringProperty(required=True)),
|
('property1', stix2.properties.StringProperty(required=True)),
|
||||||
])
|
])
|
||||||
|
@ -127,6 +158,8 @@ def test_custom_object_no_init():
|
||||||
no = NewObj(property1='something')
|
no = NewObj(property1='something')
|
||||||
assert no.property1 == 'something'
|
assert no.property1 == 'something'
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_object_no_init_2():
|
||||||
@stix2.sdo.CustomObject('x-new-obj2', [
|
@stix2.sdo.CustomObject('x-new-obj2', [
|
||||||
('property1', stix2.properties.StringProperty(required=True)),
|
('property1', stix2.properties.StringProperty(required=True)),
|
||||||
])
|
])
|
||||||
|
@ -170,23 +203,36 @@ class NewObservable():
|
||||||
def __init__(self, property2=None, **kwargs):
|
def __init__(self, property2=None, **kwargs):
|
||||||
if property2 and property2 < 10:
|
if property2 and property2 < 10:
|
||||||
raise ValueError("'property2' is too small.")
|
raise ValueError("'property2' is too small.")
|
||||||
|
if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
|
||||||
|
raise TypeError("Must be integer!")
|
||||||
|
|
||||||
|
|
||||||
def test_custom_observable_object():
|
def test_custom_observable_object_1():
|
||||||
no = NewObservable(property1='something')
|
no = NewObservable(property1='something')
|
||||||
assert no.property1 == 'something'
|
assert no.property1 == 'something'
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_observable_object_2():
|
||||||
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
|
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
|
||||||
NewObservable(property2=42)
|
NewObservable(property2=42)
|
||||||
assert excinfo.value.properties == ['property1']
|
assert excinfo.value.properties == ['property1']
|
||||||
assert "No values for required properties" in str(excinfo.value)
|
assert "No values for required properties" in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_observable_object_3():
|
||||||
with pytest.raises(ValueError) as excinfo:
|
with pytest.raises(ValueError) as excinfo:
|
||||||
NewObservable(property1='something', property2=4)
|
NewObservable(property1='something', property2=4)
|
||||||
assert "'property2' is too small." in str(excinfo.value)
|
assert "'property2' is too small." in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
def test_custom_observable_object_no_init():
|
def test_custom_observable_raises_exception():
|
||||||
|
with pytest.raises(TypeError) as excinfo:
|
||||||
|
NewObservable(property1='something', property3='something', allow_custom=True)
|
||||||
|
|
||||||
|
assert str(excinfo.value) == "Must be integer!"
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_observable_object_no_init_1():
|
||||||
@stix2.observables.CustomObservable('x-new-observable', [
|
@stix2.observables.CustomObservable('x-new-observable', [
|
||||||
('property1', stix2.properties.StringProperty()),
|
('property1', stix2.properties.StringProperty()),
|
||||||
])
|
])
|
||||||
|
@ -196,6 +242,8 @@ def test_custom_observable_object_no_init():
|
||||||
no = NewObs(property1='something')
|
no = NewObs(property1='something')
|
||||||
assert no.property1 == 'something'
|
assert no.property1 == 'something'
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_observable_object_no_init_2():
|
||||||
@stix2.observables.CustomObservable('x-new-obs2', [
|
@stix2.observables.CustomObservable('x-new-obs2', [
|
||||||
('property1', stix2.properties.StringProperty()),
|
('property1', stix2.properties.StringProperty()),
|
||||||
])
|
])
|
||||||
|
@ -354,6 +402,15 @@ class NewExtension():
|
||||||
def __init__(self, property2=None, **kwargs):
|
def __init__(self, property2=None, **kwargs):
|
||||||
if property2 and property2 < 10:
|
if property2 and property2 < 10:
|
||||||
raise ValueError("'property2' is too small.")
|
raise ValueError("'property2' is too small.")
|
||||||
|
if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
|
||||||
|
raise TypeError("Must be integer!")
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_extension_raises_exception():
|
||||||
|
with pytest.raises(TypeError) as excinfo:
|
||||||
|
NewExtension(property1='something', property3='something', allow_custom=True)
|
||||||
|
|
||||||
|
assert str(excinfo.value) == "Must be integer!"
|
||||||
|
|
||||||
|
|
||||||
def test_custom_extension():
|
def test_custom_extension():
|
||||||
|
@ -433,7 +490,7 @@ def test_custom_extension_empty_properties():
|
||||||
assert "'properties' must be a dict!" in str(excinfo.value)
|
assert "'properties' must be a dict!" in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
def test_custom_extension_no_init():
|
def test_custom_extension_no_init_1():
|
||||||
@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-extension', {
|
@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-extension', {
|
||||||
'property1': stix2.properties.StringProperty(required=True),
|
'property1': stix2.properties.StringProperty(required=True),
|
||||||
})
|
})
|
||||||
|
@ -443,6 +500,8 @@ def test_custom_extension_no_init():
|
||||||
ne = NewExt(property1="foobar")
|
ne = NewExt(property1="foobar")
|
||||||
assert ne.property1 == "foobar"
|
assert ne.property1 == "foobar"
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_extension_no_init_2():
|
||||||
@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-ext2', {
|
@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-ext2', {
|
||||||
'property1': stix2.properties.StringProperty(required=True),
|
'property1': stix2.properties.StringProperty(required=True),
|
||||||
})
|
})
|
||||||
|
|
|
@ -547,3 +547,11 @@ def test_composite_datasource_operations():
|
||||||
# nothing returns the same as cds1.query(query1) (the associated query is query2)
|
# nothing returns the same as cds1.query(query1) (the associated query is query2)
|
||||||
results = cds1.query([])
|
results = cds1.query([])
|
||||||
assert len(results) == 3
|
assert len(results) == 3
|
||||||
|
|
||||||
|
|
||||||
|
def test_composite_datastore_no_datasource():
|
||||||
|
cds = CompositeDataSource()
|
||||||
|
|
||||||
|
with pytest.raises(AttributeError) as excinfo:
|
||||||
|
cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||||
|
assert 'CompositeDataSource has no data source' in str(excinfo.value)
|
||||||
|
|
|
@ -2,8 +2,22 @@ import pytest
|
||||||
|
|
||||||
import stix2
|
import stix2
|
||||||
|
|
||||||
from .constants import (FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID,
|
from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID,
|
||||||
INDICATOR_KWARGS, MALWARE_ID)
|
IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
|
||||||
|
MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def ds():
|
||||||
|
cam = stix2.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
|
||||||
|
idy = stix2.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
||||||
|
ind = stix2.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||||
|
mal = stix2.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
|
||||||
|
rel1 = stix2.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
|
||||||
|
rel2 = stix2.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
|
||||||
|
rel3 = stix2.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
|
||||||
|
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
|
||||||
|
yield stix2.MemoryStore(stix_objs)
|
||||||
|
|
||||||
|
|
||||||
def test_object_factory_created_by_ref_str():
|
def test_object_factory_created_by_ref_str():
|
||||||
|
@ -150,6 +164,14 @@ def test_environment_no_datastore():
|
||||||
env.query(INDICATOR_ID)
|
env.query(INDICATOR_ID)
|
||||||
assert 'Environment has no data source' in str(excinfo.value)
|
assert 'Environment has no data source' in str(excinfo.value)
|
||||||
|
|
||||||
|
with pytest.raises(AttributeError) as excinfo:
|
||||||
|
env.relationships(INDICATOR_ID)
|
||||||
|
assert 'Environment has no data source' in str(excinfo.value)
|
||||||
|
|
||||||
|
with pytest.raises(AttributeError) as excinfo:
|
||||||
|
env.related_to(INDICATOR_ID)
|
||||||
|
assert 'Environment has no data source' in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
def test_environment_add_filters():
|
def test_environment_add_filters():
|
||||||
env = stix2.Environment(factory=stix2.ObjectFactory())
|
env = stix2.Environment(factory=stix2.ObjectFactory())
|
||||||
|
@ -186,7 +208,7 @@ def test_parse_malware():
|
||||||
assert mal.name == "Cryptolocker"
|
assert mal.name == "Cryptolocker"
|
||||||
|
|
||||||
|
|
||||||
def test_created_by():
|
def test_creator_of():
|
||||||
identity = stix2.Identity(**IDENTITY_KWARGS)
|
identity = stix2.Identity(**IDENTITY_KWARGS)
|
||||||
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
||||||
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
|
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
|
||||||
|
@ -197,7 +219,7 @@ def test_created_by():
|
||||||
assert creator is identity
|
assert creator is identity
|
||||||
|
|
||||||
|
|
||||||
def test_created_by_no_datasource():
|
def test_creator_of_no_datasource():
|
||||||
identity = stix2.Identity(**IDENTITY_KWARGS)
|
identity = stix2.Identity(**IDENTITY_KWARGS)
|
||||||
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
||||||
env = stix2.Environment(factory=factory)
|
env = stix2.Environment(factory=factory)
|
||||||
|
@ -208,7 +230,7 @@ def test_created_by_no_datasource():
|
||||||
assert 'Environment has no data source' in str(excinfo.value)
|
assert 'Environment has no data source' in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
def test_created_by_not_found():
|
def test_creator_of_not_found():
|
||||||
identity = stix2.Identity(**IDENTITY_KWARGS)
|
identity = stix2.Identity(**IDENTITY_KWARGS)
|
||||||
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
||||||
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
|
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
|
||||||
|
@ -216,3 +238,113 @@ def test_created_by_not_found():
|
||||||
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
|
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
|
||||||
creator = env.creator_of(ind)
|
creator = env.creator_of(ind)
|
||||||
assert creator is None
|
assert creator is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_creator_of_no_created_by_ref():
|
||||||
|
env = stix2.Environment(store=stix2.MemoryStore())
|
||||||
|
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
|
||||||
|
creator = env.creator_of(ind)
|
||||||
|
assert creator is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
mal = env.get(MALWARE_ID)
|
||||||
|
resp = env.relationships(mal)
|
||||||
|
|
||||||
|
assert len(resp) == 3
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_no_id(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
mal = {
|
||||||
|
"type": "malware",
|
||||||
|
"name": "some variant"
|
||||||
|
}
|
||||||
|
with pytest.raises(ValueError) as excinfo:
|
||||||
|
env.relationships(mal)
|
||||||
|
assert "object has no 'id' property" in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_type(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
mal = env.get(MALWARE_ID)
|
||||||
|
resp = env.relationships(mal, relationship_type='indicates')
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert resp[0]['id'] == RELATIONSHIP_IDS[0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_source(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
resp = env.relationships(MALWARE_ID, source_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert resp[0]['id'] == RELATIONSHIP_IDS[1]
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_target(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
resp = env.relationships(MALWARE_ID, target_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 2
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_target_and_type(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
resp = env.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_target_and_source(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
with pytest.raises(ValueError) as excinfo:
|
||||||
|
env.relationships(MALWARE_ID, target_only=True, source_only=True)
|
||||||
|
|
||||||
|
assert 'not both' in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_to(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
mal = env.get(MALWARE_ID)
|
||||||
|
resp = env.related_to(mal)
|
||||||
|
|
||||||
|
assert len(resp) == 3
|
||||||
|
assert any(x['id'] == CAMPAIGN_ID for x in resp)
|
||||||
|
assert any(x['id'] == INDICATOR_ID for x in resp)
|
||||||
|
assert any(x['id'] == IDENTITY_ID for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_to_no_id(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
mal = {
|
||||||
|
"type": "malware",
|
||||||
|
"name": "some variant"
|
||||||
|
}
|
||||||
|
with pytest.raises(ValueError) as excinfo:
|
||||||
|
env.related_to(mal)
|
||||||
|
assert "object has no 'id' property" in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_to_by_source(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
resp = env.related_to(MALWARE_ID, source_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert resp[0]['id'] == IDENTITY_ID
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_to_by_target(ds):
|
||||||
|
env = stix2.Environment(store=ds)
|
||||||
|
resp = env.related_to(MALWARE_ID, target_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 2
|
||||||
|
assert any(x['id'] == CAMPAIGN_ID for x in resp)
|
||||||
|
assert any(x['id'] == INDICATOR_ID for x in resp)
|
||||||
|
|
|
@ -4,7 +4,12 @@ import shutil
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
|
from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
|
||||||
FileSystemSource, FileSystemStore, Filter, properties)
|
FileSystemSource, FileSystemStore, Filter, Identity,
|
||||||
|
Indicator, Malware, Relationship, properties)
|
||||||
|
|
||||||
|
from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
|
||||||
|
IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
|
||||||
|
MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS)
|
||||||
|
|
||||||
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
|
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
|
||||||
|
|
||||||
|
@ -40,6 +45,25 @@ def fs_sink():
|
||||||
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
|
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='module')
|
||||||
|
def rel_fs_store():
|
||||||
|
cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
|
||||||
|
idy = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
||||||
|
ind = Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||||
|
mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
|
||||||
|
rel1 = Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
|
||||||
|
rel2 = Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
|
||||||
|
rel3 = Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
|
||||||
|
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
|
||||||
|
fs = FileSystemStore(FS_PATH)
|
||||||
|
for o in stix_objs:
|
||||||
|
fs.add(o)
|
||||||
|
yield fs
|
||||||
|
|
||||||
|
for o in stix_objs:
|
||||||
|
os.remove(os.path.join(FS_PATH, o.type, o.id + '.json'))
|
||||||
|
|
||||||
|
|
||||||
def test_filesystem_source_nonexistent_folder():
|
def test_filesystem_source_nonexistent_folder():
|
||||||
with pytest.raises(ValueError) as excinfo:
|
with pytest.raises(ValueError) as excinfo:
|
||||||
FileSystemSource('nonexistent-folder')
|
FileSystemSource('nonexistent-folder')
|
||||||
|
@ -375,3 +399,75 @@ def test_filesystem_custom_object(fs_store):
|
||||||
|
|
||||||
# remove dir
|
# remove dir
|
||||||
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True)
|
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships(rel_fs_store):
|
||||||
|
mal = rel_fs_store.get(MALWARE_ID)
|
||||||
|
resp = rel_fs_store.relationships(mal)
|
||||||
|
|
||||||
|
assert len(resp) == 3
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_type(rel_fs_store):
|
||||||
|
mal = rel_fs_store.get(MALWARE_ID)
|
||||||
|
resp = rel_fs_store.relationships(mal, relationship_type='indicates')
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert resp[0]['id'] == RELATIONSHIP_IDS[0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_source(rel_fs_store):
|
||||||
|
resp = rel_fs_store.relationships(MALWARE_ID, source_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert resp[0]['id'] == RELATIONSHIP_IDS[1]
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_target(rel_fs_store):
|
||||||
|
resp = rel_fs_store.relationships(MALWARE_ID, target_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 2
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_target_and_type(rel_fs_store):
|
||||||
|
resp = rel_fs_store.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_target_and_source(rel_fs_store):
|
||||||
|
with pytest.raises(ValueError) as excinfo:
|
||||||
|
rel_fs_store.relationships(MALWARE_ID, target_only=True, source_only=True)
|
||||||
|
|
||||||
|
assert 'not both' in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_to(rel_fs_store):
|
||||||
|
mal = rel_fs_store.get(MALWARE_ID)
|
||||||
|
resp = rel_fs_store.related_to(mal)
|
||||||
|
|
||||||
|
assert len(resp) == 3
|
||||||
|
assert any(x['id'] == CAMPAIGN_ID for x in resp)
|
||||||
|
assert any(x['id'] == INDICATOR_ID for x in resp)
|
||||||
|
assert any(x['id'] == IDENTITY_ID for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_to_by_source(rel_fs_store):
|
||||||
|
resp = rel_fs_store.related_to(MALWARE_ID, source_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert any(x['id'] == IDENTITY_ID for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_to_by_target(rel_fs_store):
|
||||||
|
resp = rel_fs_store.related_to(MALWARE_ID, target_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 2
|
||||||
|
assert any(x['id'] == CAMPAIGN_ID for x in resp)
|
||||||
|
assert any(x['id'] == INDICATOR_ID for x in resp)
|
||||||
|
|
|
@ -187,7 +187,8 @@ def test_parse_marking_definition(data):
|
||||||
])
|
])
|
||||||
class NewMarking(object):
|
class NewMarking(object):
|
||||||
def __init__(self, property2=None, **kwargs):
|
def __init__(self, property2=None, **kwargs):
|
||||||
return
|
if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
|
||||||
|
raise TypeError("Must be integer!")
|
||||||
|
|
||||||
|
|
||||||
def test_registered_custom_marking():
|
def test_registered_custom_marking():
|
||||||
|
@ -208,6 +209,13 @@ def test_registered_custom_marking():
|
||||||
assert marking_def.definition_type == "x-new-marking-type"
|
assert marking_def.definition_type == "x-new-marking-type"
|
||||||
|
|
||||||
|
|
||||||
|
def test_registered_custom_marking_raises_exception():
|
||||||
|
with pytest.raises(TypeError) as excinfo:
|
||||||
|
NewMarking(property1='something', property3='something', allow_custom=True)
|
||||||
|
|
||||||
|
assert str(excinfo.value) == "Must be integer!"
|
||||||
|
|
||||||
|
|
||||||
def test_not_registered_marking_raises_exception():
|
def test_not_registered_marking_raises_exception():
|
||||||
with pytest.raises(ValueError) as excinfo:
|
with pytest.raises(ValueError) as excinfo:
|
||||||
# Used custom object on purpose to demonstrate a not-registered marking
|
# Used custom object on purpose to demonstrate a not-registered marking
|
||||||
|
|
|
@ -3,10 +3,15 @@ import shutil
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from stix2 import (Bundle, Campaign, CustomObject, Filter, MemorySource,
|
from stix2 import (Bundle, Campaign, CustomObject, Filter, Identity, Indicator,
|
||||||
MemoryStore, properties)
|
Malware, MemorySource, MemoryStore, Relationship,
|
||||||
|
properties)
|
||||||
from stix2.sources import make_id
|
from stix2.sources import make_id
|
||||||
|
|
||||||
|
from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
|
||||||
|
IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
|
||||||
|
MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS)
|
||||||
|
|
||||||
IND1 = {
|
IND1 = {
|
||||||
"created": "2017-01-27T13:49:53.935Z",
|
"created": "2017-01-27T13:49:53.935Z",
|
||||||
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||||
|
@ -118,6 +123,19 @@ def mem_source():
|
||||||
yield MemorySource(STIX_OBJS1)
|
yield MemorySource(STIX_OBJS1)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def rel_mem_store():
|
||||||
|
cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
|
||||||
|
idy = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
||||||
|
ind = Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
|
||||||
|
mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
|
||||||
|
rel1 = Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
|
||||||
|
rel2 = Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
|
||||||
|
rel3 = Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
|
||||||
|
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
|
||||||
|
yield MemoryStore(stix_objs)
|
||||||
|
|
||||||
|
|
||||||
def test_memory_source_get(mem_source):
|
def test_memory_source_get(mem_source):
|
||||||
resp = mem_source.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
|
resp = mem_source.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
|
||||||
assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
|
assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
|
||||||
|
@ -287,3 +305,75 @@ def test_memory_store_custom_object(mem_store):
|
||||||
newobj_r = mem_store.get(newobj.id)
|
newobj_r = mem_store.get(newobj.id)
|
||||||
assert newobj_r.id == newobj.id
|
assert newobj_r.id == newobj.id
|
||||||
assert newobj_r.property1 == 'something'
|
assert newobj_r.property1 == 'something'
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships(rel_mem_store):
|
||||||
|
mal = rel_mem_store.get(MALWARE_ID)
|
||||||
|
resp = rel_mem_store.relationships(mal)
|
||||||
|
|
||||||
|
assert len(resp) == 3
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_type(rel_mem_store):
|
||||||
|
mal = rel_mem_store.get(MALWARE_ID)
|
||||||
|
resp = rel_mem_store.relationships(mal, relationship_type='indicates')
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert resp[0]['id'] == RELATIONSHIP_IDS[0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_source(rel_mem_store):
|
||||||
|
resp = rel_mem_store.relationships(MALWARE_ID, source_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert resp[0]['id'] == RELATIONSHIP_IDS[1]
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_target(rel_mem_store):
|
||||||
|
resp = rel_mem_store.relationships(MALWARE_ID, target_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 2
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_target_and_type(rel_mem_store):
|
||||||
|
resp = rel_mem_store.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_relationships_by_target_and_source(rel_mem_store):
|
||||||
|
with pytest.raises(ValueError) as excinfo:
|
||||||
|
rel_mem_store.relationships(MALWARE_ID, target_only=True, source_only=True)
|
||||||
|
|
||||||
|
assert 'not both' in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_to(rel_mem_store):
|
||||||
|
mal = rel_mem_store.get(MALWARE_ID)
|
||||||
|
resp = rel_mem_store.related_to(mal)
|
||||||
|
|
||||||
|
assert len(resp) == 3
|
||||||
|
assert any(x['id'] == CAMPAIGN_ID for x in resp)
|
||||||
|
assert any(x['id'] == INDICATOR_ID for x in resp)
|
||||||
|
assert any(x['id'] == IDENTITY_ID for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_to_by_source(rel_mem_store):
|
||||||
|
resp = rel_mem_store.related_to(MALWARE_ID, source_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 1
|
||||||
|
assert any(x['id'] == IDENTITY_ID for x in resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_related_to_by_target(rel_mem_store):
|
||||||
|
resp = rel_mem_store.related_to(MALWARE_ID, target_only=True)
|
||||||
|
|
||||||
|
assert len(resp) == 2
|
||||||
|
assert any(x['id'] == CAMPAIGN_ID for x in resp)
|
||||||
|
assert any(x['id'] == INDICATOR_ID for x in resp)
|
||||||
|
|
|
@ -74,3 +74,11 @@ def test_get_dict(data):
|
||||||
def test_get_dict_invalid(data):
|
def test_get_dict_invalid(data):
|
||||||
with pytest.raises(ValueError):
|
with pytest.raises(ValueError):
|
||||||
stix2.utils.get_dict(data)
|
stix2.utils.get_dict(data)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('stix_id, typ', [
|
||||||
|
('malware--d69c8146-ab35-4d50-8382-6fc80e641d43', 'malware'),
|
||||||
|
('intrusion-set--899ce53f-13a0-479b-a0e4-67d46e241542', 'intrusion-set')
|
||||||
|
])
|
||||||
|
def test_get_type_from_id(stix_id, typ):
|
||||||
|
assert stix2.utils.get_type_from_id(stix_id) == typ
|
||||||
|
|
|
@ -314,3 +314,7 @@ def remove_custom_stix(stix_obj):
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return stix_obj
|
return stix_obj
|
||||||
|
|
||||||
|
|
||||||
|
def get_type_from_id(stix_id):
|
||||||
|
return stix_id.split('--', 1)[0]
|
||||||
|
|
|
@ -145,7 +145,14 @@ def CustomMarking(type='x-custom-marking', properties=None):
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
_STIXBase.__init__(self, **kwargs)
|
_STIXBase.__init__(self, **kwargs)
|
||||||
cls.__init__(self, **kwargs)
|
try:
|
||||||
|
cls.__init__(self, **kwargs)
|
||||||
|
except (AttributeError, TypeError) as e:
|
||||||
|
# Don't accidentally catch errors raised in a custom __init__()
|
||||||
|
if ("has no attribute '__init__'" in str(e) or
|
||||||
|
str(e) == "object.__init__() takes no parameters"):
|
||||||
|
return
|
||||||
|
raise e
|
||||||
|
|
||||||
_register_marking(_Custom)
|
_register_marking(_Custom)
|
||||||
return _Custom
|
return _Custom
|
||||||
|
|
3
tox.ini
3
tox.ini
|
@ -1,5 +1,5 @@
|
||||||
[tox]
|
[tox]
|
||||||
envlist = py27,py33,py34,py35,py36,pycodestyle,isort-check
|
envlist = py27,py34,py35,py36,pycodestyle,isort-check
|
||||||
|
|
||||||
[testenv]
|
[testenv]
|
||||||
deps =
|
deps =
|
||||||
|
@ -36,7 +36,6 @@ commands =
|
||||||
[travis]
|
[travis]
|
||||||
python =
|
python =
|
||||||
2.7: py27, pycodestyle
|
2.7: py27, pycodestyle
|
||||||
3.3: py33, pycodestyle
|
|
||||||
3.4: py34, pycodestyle
|
3.4: py34, pycodestyle
|
||||||
3.5: py35, pycodestyle
|
3.5: py35, pycodestyle
|
||||||
3.6: py36, pycodestyle
|
3.6: py36, pycodestyle
|
||||||
|
|
Loading…
Reference in New Issue