Add related_to()

Function for calling relationships() but instead of just returning the
Relationship objects, returns the STIX objects being refered to in those
Relationships.
stix2.0
Chris Lenk 2017-11-16 14:58:59 -05:00
parent 86f28644f9
commit 29dec997a0
5 changed files with 152 additions and 0 deletions

View File

@ -178,3 +178,10 @@ class Environment(object):
except AttributeError:
raise AttributeError('Environment has no data source')
relationships.__doc__ = DataStore.relationships.__doc__
def related_to(self, *args, **kwargs):
try:
return self.source.related_to(*args, **kwargs)
except AttributeError:
raise AttributeError('Environment has no data source')
related_to.__doc__ = DataStore.related_to.__doc__

View File

@ -112,6 +112,30 @@ class DataStore(object):
"""
return self.source.relationships(*args, **kwargs)
def related_to(self, *args, **kwargs):
"""Retrieve STIX Objects that have a Relationship involving the given
STIX object.
Translate related_to() call to the appropriate DataSource call.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
related objects will be looked up.
relationship_type (str): Only retrieve objects related by this
Relationships type.
source_only (bool): Only examine Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only examine Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of STIX objects related to the given STIX object.
"""
return self.source.related_to(*args, **kwargs)
def add(self, *args, **kwargs):
"""Method for storing STIX objects.
@ -236,6 +260,48 @@ class DataSource(with_metaclass(ABCMeta)):
"""
def related_to(self, obj, relationship_type=None, source_only=False, target_only=False):
"""Retrieve STIX Objects that have a Relationship involving the given
STIX object.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
related objects will be looked up.
relationship_type (str): Only retrieve objects related by this
Relationships type.
source_only (bool): Only examine Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only examine Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of STIX objects related to the given STIX object.
"""
results = []
rels = self.relationships(obj, relationship_type, source_only, target_only)
try:
obj_id = obj.get('id', '')
except AttributeError:
obj_id = obj
for r in rels:
if not source_only:
# relationships() found relationships where target_ref is obj_id
source_id = r.source_ref
if source_id != obj_id: # needed if target_only is also false
results.append(self.get(source_id))
if not target_only:
# relationships() found relationships where source_ref is obj_id
target_id = r.target_ref
if target_id != obj_id: # needed if source_only is also false
results.append(self.get(target_id))
return results
class CompositeDataSource(DataSource):
"""Controller for all the attached DataSources.

View File

@ -283,3 +283,31 @@ def test_relationships_by_target_and_source(ds):
env.relationships(MALWARE_ID, target_only=True, source_only=True)
assert 'not both' in str(excinfo.value)
def test_related_to(ds):
env = stix2.Environment(store=ds)
mal = env.get(MALWARE_ID)
resp = env.related_to(mal)
assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_source(ds):
env = stix2.Environment(store=ds)
resp = env.related_to(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert resp[0]['id'] == IDENTITY_ID
def test_related_to_by_target(ds):
env = stix2.Environment(store=ds)
resp = env.related_to(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)

View File

@ -446,3 +446,28 @@ def test_relationships_by_target_and_source(rel_fs_store):
rel_fs_store.relationships(MALWARE_ID, target_only=True, source_only=True)
assert 'not both' in str(excinfo.value)
def test_related_to(rel_fs_store):
mal = rel_fs_store.get(MALWARE_ID)
resp = rel_fs_store.related_to(mal)
assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_source(rel_fs_store):
resp = rel_fs_store.related_to(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_target(rel_fs_store):
resp = rel_fs_store.related_to(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)

View File

@ -352,3 +352,29 @@ def test_relationships_by_target_and_source(rel_mem_store):
rel_mem_store.relationships(MALWARE_ID, target_only=True, source_only=True)
assert 'not both' in str(excinfo.value)
def test_related_to(rel_mem_store):
mal = rel_mem_store.get(MALWARE_ID)
resp = rel_mem_store.related_to(mal)
assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_source(rel_mem_store):
resp = rel_mem_store.related_to(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_target(rel_mem_store):
resp = rel_mem_store.related_to(MALWARE_ID, target_only=True)
print(resp)
assert len(resp) == 2
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)