Merge pull request #111 from oasis-open/89-relationships

Support de-referencing relationships
stix2.0
Greg Back 2017-11-27 14:19:29 +00:00 committed by GitHub
commit 98ca928dea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 897 additions and 63 deletions

View File

@ -23,7 +23,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 40,
"metadata": {
"collapsed": true,
"nbsphinx": "hidden"
@ -262,6 +262,277 @@
"# attach multiple filters to a MemoryStore\n",
"mem.source.filters.update([f1,f2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## De-Referencing Relationships\n",
"\n",
"Given a STIX object, there are several ways to find other STIX objects related to it. To illustrate this, let's first create a [DataStore](../api/stix2.sources.rst#stix2.sources.DataStore) and add some objects and relationships."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"from stix2 import Campaign, Identity, Indicator, Malware, Relationship\n",
"\n",
"mem = MemoryStore()\n",
"cam = Campaign(name='Charge', description='Attack!')\n",
"idy = Identity(name='John Doe', identity_class=\"individual\")\n",
"ind = Indicator(labels=['malicious-activity'], pattern=\"[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']\")\n",
"mal = Malware(labels=['ransomware'], name=\"Cryptolocker\", created_by_ref=idy)\n",
"rel1 = Relationship(ind, 'indicates', mal,)\n",
"rel2 = Relationship(mal, 'targets', idy)\n",
"rel3 = Relationship(cam, 'uses', mal)\n",
"mem.add([cam, idy, ind, mal, rel1, rel2, rel3])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If a STIX object has a `created_by_ref` property, you can use the [creator_of()](../api/stix2.sources.rst#stix2.sources.DataSource.creator_of) method to retrieve the [Identity](../api/stix2.v20.sdo.rst#stix2.v20.sdo.Identity) object that created it."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<style type=\"text/css\">.highlight .hll { background-color: #ffffcc }\n",
".highlight { background: #f8f8f8; }\n",
".highlight .c { color: #408080; font-style: italic } /* Comment */\n",
".highlight .err { border: 1px solid #FF0000 } /* Error */\n",
".highlight .k { color: #008000; font-weight: bold } /* Keyword */\n",
".highlight .o { color: #666666 } /* Operator */\n",
".highlight .ch { color: #408080; font-style: italic } /* Comment.Hashbang */\n",
".highlight .cm { color: #408080; font-style: italic } /* Comment.Multiline */\n",
".highlight .cp { color: #BC7A00 } /* Comment.Preproc */\n",
".highlight .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */\n",
".highlight .c1 { color: #408080; font-style: italic } /* Comment.Single */\n",
".highlight .cs { color: #408080; font-style: italic } /* Comment.Special */\n",
".highlight .gd { color: #A00000 } /* Generic.Deleted */\n",
".highlight .ge { font-style: italic } /* Generic.Emph */\n",
".highlight .gr { color: #FF0000 } /* Generic.Error */\n",
".highlight .gh { color: #000080; font-weight: bold } /* Generic.Heading */\n",
".highlight .gi { color: #00A000 } /* Generic.Inserted */\n",
".highlight .go { color: #888888 } /* Generic.Output */\n",
".highlight .gp { color: #000080; font-weight: bold } /* Generic.Prompt */\n",
".highlight .gs { font-weight: bold } /* Generic.Strong */\n",
".highlight .gu { color: #800080; font-weight: bold } /* Generic.Subheading */\n",
".highlight .gt { color: #0044DD } /* Generic.Traceback */\n",
".highlight .kc { color: #008000; font-weight: bold } /* Keyword.Constant */\n",
".highlight .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */\n",
".highlight .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */\n",
".highlight .kp { color: #008000 } /* Keyword.Pseudo */\n",
".highlight .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */\n",
".highlight .kt { color: #B00040 } /* Keyword.Type */\n",
".highlight .m { color: #666666 } /* Literal.Number */\n",
".highlight .s { color: #BA2121 } /* Literal.String */\n",
".highlight .na { color: #7D9029 } /* Name.Attribute */\n",
".highlight .nb { color: #008000 } /* Name.Builtin */\n",
".highlight .nc { color: #0000FF; font-weight: bold } /* Name.Class */\n",
".highlight .no { color: #880000 } /* Name.Constant */\n",
".highlight .nd { color: #AA22FF } /* Name.Decorator */\n",
".highlight .ni { color: #999999; font-weight: bold } /* Name.Entity */\n",
".highlight .ne { color: #D2413A; font-weight: bold } /* Name.Exception */\n",
".highlight .nf { color: #0000FF } /* Name.Function */\n",
".highlight .nl { color: #A0A000 } /* Name.Label */\n",
".highlight .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */\n",
".highlight .nt { color: #008000; font-weight: bold } /* Name.Tag */\n",
".highlight .nv { color: #19177C } /* Name.Variable */\n",
".highlight .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */\n",
".highlight .w { color: #bbbbbb } /* Text.Whitespace */\n",
".highlight .mb { color: #666666 } /* Literal.Number.Bin */\n",
".highlight .mf { color: #666666 } /* Literal.Number.Float */\n",
".highlight .mh { color: #666666 } /* Literal.Number.Hex */\n",
".highlight .mi { color: #666666 } /* Literal.Number.Integer */\n",
".highlight .mo { color: #666666 } /* Literal.Number.Oct */\n",
".highlight .sa { color: #BA2121 } /* Literal.String.Affix */\n",
".highlight .sb { color: #BA2121 } /* Literal.String.Backtick */\n",
".highlight .sc { color: #BA2121 } /* Literal.String.Char */\n",
".highlight .dl { color: #BA2121 } /* Literal.String.Delimiter */\n",
".highlight .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */\n",
".highlight .s2 { color: #BA2121 } /* Literal.String.Double */\n",
".highlight .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */\n",
".highlight .sh { color: #BA2121 } /* Literal.String.Heredoc */\n",
".highlight .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */\n",
".highlight .sx { color: #008000 } /* Literal.String.Other */\n",
".highlight .sr { color: #BB6688 } /* Literal.String.Regex */\n",
".highlight .s1 { color: #BA2121 } /* Literal.String.Single */\n",
".highlight .ss { color: #19177C } /* Literal.String.Symbol */\n",
".highlight .bp { color: #008000 } /* Name.Builtin.Pseudo */\n",
".highlight .fm { color: #0000FF } /* Name.Function.Magic */\n",
".highlight .vc { color: #19177C } /* Name.Variable.Class */\n",
".highlight .vg { color: #19177C } /* Name.Variable.Global */\n",
".highlight .vi { color: #19177C } /* Name.Variable.Instance */\n",
".highlight .vm { color: #19177C } /* Name.Variable.Magic */\n",
".highlight .il { color: #666666 } /* Literal.Number.Integer.Long */</style><div class=\"highlight\"><pre><span></span><span class=\"p\">{</span>\n",
" <span class=\"nt\">&quot;type&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;identity&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;id&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;identity--be3baac0-9aba-48a8-81e4-4408b1c379a8&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;created&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;2017-11-21T22:14:45.213Z&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;modified&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;2017-11-21T22:14:45.213Z&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;name&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;John Doe&quot;</span><span class=\"p\">,</span>\n",
" <span class=\"nt\">&quot;identity_class&quot;</span><span class=\"p\">:</span> <span class=\"s2\">&quot;individual&quot;</span>\n",
"<span class=\"p\">}</span>\n",
"</pre></div>\n"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"print(mem.creator_of(mal))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Use the [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) method to retrieve all the relationship objects that reference a STIX object."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"3"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"rels = mem.relationships(mal)\n",
"len(rels)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can limit it to only specific relationship types:"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Relationship(type='relationship', id='relationship--bd6fd399-c907-4feb-b1da-b90f15942f1d', created='2017-11-21T22:14:45.214Z', modified='2017-11-21T22:14:45.214Z', relationship_type=u'indicates', source_ref='indicator--5ee33ff0-c50d-456b-a8dd-b5d1b69a66e8', target_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4')]"
]
},
"execution_count": 27,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"mem.relationships(mal, relationship_type='indicates')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can limit it to only relationships where the given object is the source:"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Relationship(type='relationship', id='relationship--7eb7f5cd-8bf2-4f7c-8756-84c0b5693b9a', created='2017-11-21T22:14:45.215Z', modified='2017-11-21T22:14:45.215Z', relationship_type=u'targets', source_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4', target_ref='identity--be3baac0-9aba-48a8-81e4-4408b1c379a8')]"
]
},
"execution_count": 28,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"mem.relationships(mal, source_only=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And you can limit it to only relationships where the given object is the target:"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Relationship(type='relationship', id='relationship--bd6fd399-c907-4feb-b1da-b90f15942f1d', created='2017-11-21T22:14:45.214Z', modified='2017-11-21T22:14:45.214Z', relationship_type=u'indicates', source_ref='indicator--5ee33ff0-c50d-456b-a8dd-b5d1b69a66e8', target_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4'),\n",
" Relationship(type='relationship', id='relationship--3c759d40-c92a-430e-aab6-77d5c5763302', created='2017-11-21T22:14:45.215Z', modified='2017-11-21T22:14:45.215Z', relationship_type=u'uses', source_ref='campaign--82ab7aa4-d13b-4e99-8a09-ebcba30668a7', target_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4')]"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"mem.relationships(mal, target_only=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, you can retrieve all STIX objects related to a given STIX object using [related_to()](../api/stix2.sources.rst#stix2.sources.DataSource.related_to). This calls [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) but then performs the extra step of getting the objects that these Relationships point to. [related_to()](../api/stix2.sources.rst#stix2.sources.DataSource.related_to) takes all the same arguments that [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) does."
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Campaign(type='campaign', id='campaign--82ab7aa4-d13b-4e99-8a09-ebcba30668a7', created='2017-11-21T22:14:45.213Z', modified='2017-11-21T22:14:45.213Z', name=u'Charge', description=u'Attack!')]"
]
},
"execution_count": 42,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"mem.related_to(mal, target_only=True, relationship_type='uses')"
]
}
],
"metadata": {

View File

@ -128,7 +128,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"You can retrieve STIX objects from the [DataSources](../api/stix2.sources.rst#stix2.sources.DataSource) in the [Environment](../api/stix2.environment.rst#stix2.environment.Environment) with [get()](../api/stix2.environment.rst#stix2.environment.Environment.get), [query()](../api/stix2.environment.rst#stix2.environment.Environment.query), and [all_versions()](../api/stix2.environment.rst#stix2.environment.Environment.all_versions), just as you would for a [DataSource](../api/stix2.sources.rst#stix2.sources.DataSource)."
"You can retrieve STIX objects from the [DataSources](../api/stix2.sources.rst#stix2.sources.DataSource) in the [Environment](../api/stix2.environment.rst#stix2.environment.Environment) with [get()](../api/stix2.environment.rst#stix2.environment.Environment.get), [query()](../api/stix2.environment.rst#stix2.environment.Environment.query), [all_versions()](../api/stix2.environment.rst#stix2.environment.Environment.all_versions), [creator_of()](../api/stix2.sources.rst#stix2.sources.DataSource.creator_of), [related_to()](../api/stix2.sources.rst#stix2.sources.DataSource.related_to), and [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) just as you would for a [DataSource](../api/stix2.sources.rst#stix2.sources.DataSource)."
]
},
{

View File

@ -105,30 +105,13 @@ class Environment(object):
return self.factory.create(*args, **kwargs)
create.__doc__ = ObjectFactory.create.__doc__
def get(self, *args, **kwargs):
try:
return self.source.get(*args, **kwargs)
except AttributeError:
raise AttributeError('Environment has no data source to query')
get.__doc__ = DataStore.get.__doc__
def all_versions(self, *args, **kwargs):
"""Retrieve all versions of a single STIX object by ID.
"""
try:
return self.source.all_versions(*args, **kwargs)
except AttributeError:
raise AttributeError('Environment has no data source to query')
all_versions.__doc__ = DataStore.all_versions.__doc__
def query(self, *args, **kwargs):
"""Retrieve STIX objects matching a set of filters.
"""
try:
return self.source.query(*args, **kwargs)
except AttributeError:
raise AttributeError('Environment has no data source to query')
query.__doc__ = DataStore.query.__doc__
get = DataStore.__dict__['get']
all_versions = DataStore.__dict__['all_versions']
query = DataStore.__dict__['query']
creator_of = DataStore.__dict__['creator_of']
relationships = DataStore.__dict__['relationships']
related_to = DataStore.__dict__['related_to']
add = DataStore.__dict__['add']
def add_filters(self, *args, **kwargs):
try:
@ -142,31 +125,6 @@ class Environment(object):
except AttributeError:
raise AttributeError('Environment has no data source')
def add(self, *args, **kwargs):
try:
return self.sink.add(*args, **kwargs)
except AttributeError:
raise AttributeError('Environment has no data sink to put objects in')
add.__doc__ = DataStore.add.__doc__
def parse(self, *args, **kwargs):
return _parse(*args, **kwargs)
parse.__doc__ = _parse.__doc__
def creator_of(self, obj):
"""Retrieve the Identity refered to by the object's `created_by_ref`.
Args:
obj: The STIX object whose `created_by_ref` property will be looked
up.
Returns:
The STIX object's creator, or None, if the object contains no
`created_by_ref` property or the object's creator cannot be found.
"""
creator_id = obj.get('created_by_ref', '')
if creator_id:
return self.get(creator_id)
else:
return None

View File

@ -16,6 +16,7 @@ import uuid
from six import with_metaclass
from stix2.sources.filters import Filter
from stix2.utils import deduplicate
@ -58,7 +59,10 @@ class DataStore(object):
object specified by the "id".
"""
return self.source.get(*args, **kwargs)
try:
return self.source.get(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def all_versions(self, *args, **kwargs):
"""Retrieve all versions of a single STIX object by ID.
@ -72,7 +76,10 @@ class DataStore(object):
stix_objs (list): a list of STIX objects
"""
return self.source.all_versions(*args, **kwargs)
try:
return self.source.all_versions(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def query(self, *args, **kwargs):
"""Retrieve STIX objects matching a set of filters.
@ -87,7 +94,83 @@ class DataStore(object):
stix_objs (list): a list of STIX objects
"""
return self.source.query(*args, **kwargs)
try:
return self.source.query(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def creator_of(self, *args, **kwargs):
"""Retrieve the Identity refered to by the object's `created_by_ref`.
Translate creator_of() call to the appropriate DataSource call.
Args:
obj: The STIX object whose `created_by_ref` property will be looked
up.
Returns:
The STIX object's creator, or None, if the object contains no
`created_by_ref` property or the object's creator cannot be found.
"""
try:
return self.source.creator_of(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def relationships(self, *args, **kwargs):
"""Retrieve Relationships involving the given STIX object.
Translate relationships() call to the appropriate DataSource call.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
If None, all relationships will be returned, regardless of type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
"""
try:
return self.source.relationships(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def related_to(self, *args, **kwargs):
"""Retrieve STIX Objects that have a Relationship involving the given
STIX object.
Translate related_to() call to the appropriate DataSource call.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
related objects will be looked up.
relationship_type (str): Only retrieve objects related by this
Relationships type. If None, all related objects will be
returned, regardless of type.
source_only (bool): Only examine Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only examine Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of STIX objects related to the given STIX object.
"""
try:
return self.source.related_to(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def add(self, *args, **kwargs):
"""Method for storing STIX objects.
@ -99,7 +182,10 @@ class DataStore(object):
stix_objs (list): a list of STIX objects
"""
return self.sink.add(*args, **kwargs)
try:
return self.sink.add(*args, **kwargs)
except AttributeError:
raise AttributeError('%s has no data sink to put objects in' % self.__class__.__name__)
class DataSink(with_metaclass(ABCMeta)):
@ -191,6 +277,108 @@ class DataSource(with_metaclass(ABCMeta)):
"""
def creator_of(self, obj):
"""Retrieve the Identity refered to by the object's `created_by_ref`.
Args:
obj: The STIX object whose `created_by_ref` property will be looked
up.
Returns:
The STIX object's creator, or None, if the object contains no
`created_by_ref` property or the object's creator cannot be found.
"""
creator_id = obj.get('created_by_ref', '')
if creator_id:
return self.get(creator_id)
else:
return None
def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
"""Retrieve Relationships involving the given STIX object.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
If None, all relationships will be returned, regardless of type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
"""
results = []
filters = [Filter('type', '=', 'relationship')]
try:
obj_id = obj['id']
except KeyError:
raise ValueError("STIX object has no 'id' property")
except TypeError:
# Assume `obj` is an ID string
obj_id = obj
if relationship_type:
filters.append(Filter('relationship_type', '=', relationship_type))
if source_only and target_only:
raise ValueError("Search either source only or target only, but not both")
if not target_only:
results.extend(self.query(filters + [Filter('source_ref', '=', obj_id)]))
if not source_only:
results.extend(self.query(filters + [Filter('target_ref', '=', obj_id)]))
return results
def related_to(self, obj, relationship_type=None, source_only=False, target_only=False):
"""Retrieve STIX Objects that have a Relationship involving the given
STIX object.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
related objects will be looked up.
relationship_type (str): Only retrieve objects related by this
Relationships type. If None, all related objects will be
returned, regardless of type.
source_only (bool): Only examine Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only examine Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of STIX objects related to the given STIX object.
"""
results = []
rels = self.relationships(obj, relationship_type, source_only, target_only)
try:
obj_id = obj['id']
except TypeError:
# Assume `obj` is an ID string
obj_id = obj
# Get all unique ids from the relationships except that of the object
ids = set()
for r in rels:
ids.update((r.source_ref, r.target_ref))
ids.remove(obj_id)
for i in ids:
results.append(self.get(i))
return results
class CompositeDataSource(DataSource):
"""Controller for all the attached DataSources.
@ -354,6 +542,80 @@ class CompositeDataSource(DataSource):
return all_data
def relationships(self, *args, **kwargs):
"""Retrieve Relationships involving the given STIX object.
Only one of `source_only` and `target_only` may be `True`.
Federated relationships retrieve method - iterates through all
DataSources defined in "data_sources".
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
If None, all relationships will be returned, regardless of type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
"""
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
results = []
for ds in self.data_sources:
results.extend(ds.relationships(*args, **kwargs))
# remove exact duplicates (where duplicates are STIX 2.0
# objects with the same 'id' and 'modified' values)
if len(results) > 0:
results = deduplicate(results)
return results
def related_to(self, *args, **kwargs):
"""Retrieve STIX Objects that have a Relationship involving the given
STIX object.
Only one of `source_only` and `target_only` may be `True`.
Federated related objects method - iterates through all
DataSources defined in "data_sources".
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
related objects will be looked up.
relationship_type (str): Only retrieve objects related by this
Relationships type. If None, all related objects will be
returned, regardless of type.
source_only (bool): Only examine Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only examine Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of STIX objects related to the given STIX object.
"""
if not self.has_data_sources():
raise AttributeError('CompositeDataSource has no data sources')
results = []
for ds in self.data_sources:
results.extend(ds.related_to(*args, **kwargs))
# remove exact duplicates (where duplicates are STIX 2.0
# objects with the same 'id' and 'modified' values)
if len(results) > 0:
results = deduplicate(results)
return results
def add_data_source(self, data_source):
"""Attach a DataSource to CompositeDataSource instance

View File

@ -10,6 +10,11 @@ FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
"""Supported filter value types"""
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
try:
FILTER_VALUE_TYPES.append(unicode)
except NameError:
# Python 3 doesn't need to worry about unicode
pass
def _check_filter_components(prop, op, value):

View File

@ -28,6 +28,18 @@ MARKING_IDS = [
"marking-definition--68520ae2-fefe-43a9-84ee-2c2a934d2c7d",
"marking-definition--2802dfb1-1019-40a8-8848-68d0ec0e417f",
]
RELATIONSHIP_IDS = [
'relationship--06520621-5352-4e6a-b976-e8fa3d437ffd',
'relationship--181c9c09-43e6-45dd-9374-3bec192f05ef',
'relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70'
]
# All required args for a Campaign instance
CAMPAIGN_KWARGS = dict(
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector.",
)
# All required args for a Campaign instance, plus some optional args
CAMPAIGN_MORE_KWARGS = dict(

View File

@ -547,3 +547,11 @@ def test_composite_datasource_operations():
# nothing returns the same as cds1.query(query1) (the associated query is query2)
results = cds1.query([])
assert len(results) == 3
def test_composite_datastore_no_datasource():
cds = CompositeDataSource()
with pytest.raises(AttributeError) as excinfo:
cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
assert 'CompositeDataSource has no data source' in str(excinfo.value)

View File

@ -2,8 +2,22 @@ import pytest
import stix2
from .constants import (FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID,
INDICATOR_KWARGS, MALWARE_ID)
from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID,
IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS)
@pytest.fixture
def ds():
cam = stix2.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = stix2.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = stix2.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
mal = stix2.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
rel1 = stix2.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
rel2 = stix2.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
rel3 = stix2.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
yield stix2.MemoryStore(stix_objs)
def test_object_factory_created_by_ref_str():
@ -150,6 +164,14 @@ def test_environment_no_datastore():
env.query(INDICATOR_ID)
assert 'Environment has no data source' in str(excinfo.value)
with pytest.raises(AttributeError) as excinfo:
env.relationships(INDICATOR_ID)
assert 'Environment has no data source' in str(excinfo.value)
with pytest.raises(AttributeError) as excinfo:
env.related_to(INDICATOR_ID)
assert 'Environment has no data source' in str(excinfo.value)
def test_environment_add_filters():
env = stix2.Environment(factory=stix2.ObjectFactory())
@ -186,7 +208,7 @@ def test_parse_malware():
assert mal.name == "Cryptolocker"
def test_created_by():
def test_creator_of():
identity = stix2.Identity(**IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=identity.id)
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
@ -197,7 +219,7 @@ def test_created_by():
assert creator is identity
def test_created_by_no_datasource():
def test_creator_of_no_datasource():
identity = stix2.Identity(**IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=identity.id)
env = stix2.Environment(factory=factory)
@ -208,7 +230,7 @@ def test_created_by_no_datasource():
assert 'Environment has no data source' in str(excinfo.value)
def test_created_by_not_found():
def test_creator_of_not_found():
identity = stix2.Identity(**IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=identity.id)
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
@ -216,3 +238,113 @@ def test_created_by_not_found():
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
creator = env.creator_of(ind)
assert creator is None
def test_creator_of_no_created_by_ref():
env = stix2.Environment(store=stix2.MemoryStore())
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
creator = env.creator_of(ind)
assert creator is None
def test_relationships(ds):
env = stix2.Environment(store=ds)
mal = env.get(MALWARE_ID)
resp = env.relationships(mal)
assert len(resp) == 3
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_no_id(ds):
env = stix2.Environment(store=ds)
mal = {
"type": "malware",
"name": "some variant"
}
with pytest.raises(ValueError) as excinfo:
env.relationships(mal)
assert "object has no 'id' property" in str(excinfo.value)
def test_relationships_by_type(ds):
env = stix2.Environment(store=ds)
mal = env.get(MALWARE_ID)
resp = env.relationships(mal, relationship_type='indicates')
assert len(resp) == 1
assert resp[0]['id'] == RELATIONSHIP_IDS[0]
def test_relationships_by_source(ds):
env = stix2.Environment(store=ds)
resp = env.relationships(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert resp[0]['id'] == RELATIONSHIP_IDS[1]
def test_relationships_by_target(ds):
env = stix2.Environment(store=ds)
resp = env.relationships(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_target_and_type(ds):
env = stix2.Environment(store=ds)
resp = env.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
assert len(resp) == 1
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_target_and_source(ds):
env = stix2.Environment(store=ds)
with pytest.raises(ValueError) as excinfo:
env.relationships(MALWARE_ID, target_only=True, source_only=True)
assert 'not both' in str(excinfo.value)
def test_related_to(ds):
env = stix2.Environment(store=ds)
mal = env.get(MALWARE_ID)
resp = env.related_to(mal)
assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_no_id(ds):
env = stix2.Environment(store=ds)
mal = {
"type": "malware",
"name": "some variant"
}
with pytest.raises(ValueError) as excinfo:
env.related_to(mal)
assert "object has no 'id' property" in str(excinfo.value)
def test_related_to_by_source(ds):
env = stix2.Environment(store=ds)
resp = env.related_to(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert resp[0]['id'] == IDENTITY_ID
def test_related_to_by_target(ds):
env = stix2.Environment(store=ds)
resp = env.related_to(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)

View File

@ -4,7 +4,12 @@ import shutil
import pytest
from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
FileSystemSource, FileSystemStore, Filter, properties)
FileSystemSource, FileSystemStore, Filter, Identity,
Indicator, Malware, Relationship, properties)
from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS)
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
@ -40,6 +45,25 @@ def fs_sink():
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
@pytest.fixture(scope='module')
def rel_fs_store():
cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
rel1 = Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
rel2 = Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
rel3 = Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
fs = FileSystemStore(FS_PATH)
for o in stix_objs:
fs.add(o)
yield fs
for o in stix_objs:
os.remove(os.path.join(FS_PATH, o.type, o.id + '.json'))
def test_filesystem_source_nonexistent_folder():
with pytest.raises(ValueError) as excinfo:
FileSystemSource('nonexistent-folder')
@ -375,3 +399,75 @@ def test_filesystem_custom_object(fs_store):
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True)
def test_relationships(rel_fs_store):
mal = rel_fs_store.get(MALWARE_ID)
resp = rel_fs_store.relationships(mal)
assert len(resp) == 3
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_type(rel_fs_store):
mal = rel_fs_store.get(MALWARE_ID)
resp = rel_fs_store.relationships(mal, relationship_type='indicates')
assert len(resp) == 1
assert resp[0]['id'] == RELATIONSHIP_IDS[0]
def test_relationships_by_source(rel_fs_store):
resp = rel_fs_store.relationships(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert resp[0]['id'] == RELATIONSHIP_IDS[1]
def test_relationships_by_target(rel_fs_store):
resp = rel_fs_store.relationships(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_target_and_type(rel_fs_store):
resp = rel_fs_store.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
assert len(resp) == 1
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_target_and_source(rel_fs_store):
with pytest.raises(ValueError) as excinfo:
rel_fs_store.relationships(MALWARE_ID, target_only=True, source_only=True)
assert 'not both' in str(excinfo.value)
def test_related_to(rel_fs_store):
mal = rel_fs_store.get(MALWARE_ID)
resp = rel_fs_store.related_to(mal)
assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_source(rel_fs_store):
resp = rel_fs_store.related_to(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_target(rel_fs_store):
resp = rel_fs_store.related_to(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)

View File

@ -3,10 +3,15 @@ import shutil
import pytest
from stix2 import (Bundle, Campaign, CustomObject, Filter, MemorySource,
MemoryStore, properties)
from stix2 import (Bundle, Campaign, CustomObject, Filter, Identity, Indicator,
Malware, MemorySource, MemoryStore, Relationship,
properties)
from stix2.sources import make_id
from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS)
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
@ -118,6 +123,19 @@ def mem_source():
yield MemorySource(STIX_OBJS1)
@pytest.fixture
def rel_mem_store():
cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
rel1 = Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
rel2 = Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
rel3 = Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
yield MemoryStore(stix_objs)
def test_memory_source_get(mem_source):
resp = mem_source.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
@ -287,3 +305,75 @@ def test_memory_store_custom_object(mem_store):
newobj_r = mem_store.get(newobj.id)
assert newobj_r.id == newobj.id
assert newobj_r.property1 == 'something'
def test_relationships(rel_mem_store):
mal = rel_mem_store.get(MALWARE_ID)
resp = rel_mem_store.relationships(mal)
assert len(resp) == 3
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_type(rel_mem_store):
mal = rel_mem_store.get(MALWARE_ID)
resp = rel_mem_store.relationships(mal, relationship_type='indicates')
assert len(resp) == 1
assert resp[0]['id'] == RELATIONSHIP_IDS[0]
def test_relationships_by_source(rel_mem_store):
resp = rel_mem_store.relationships(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert resp[0]['id'] == RELATIONSHIP_IDS[1]
def test_relationships_by_target(rel_mem_store):
resp = rel_mem_store.relationships(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_target_and_type(rel_mem_store):
resp = rel_mem_store.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
assert len(resp) == 1
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_target_and_source(rel_mem_store):
with pytest.raises(ValueError) as excinfo:
rel_mem_store.relationships(MALWARE_ID, target_only=True, source_only=True)
assert 'not both' in str(excinfo.value)
def test_related_to(rel_mem_store):
mal = rel_mem_store.get(MALWARE_ID)
resp = rel_mem_store.related_to(mal)
assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_source(rel_mem_store):
resp = rel_mem_store.related_to(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_target(rel_mem_store):
resp = rel_mem_store.related_to(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)