Add relationships() function to Environment

stix2.0
Chris Lenk 2017-11-02 16:18:41 -04:00
parent ef6dade6f6
commit f9ad7ceb65
3 changed files with 120 additions and 2 deletions

View File

@ -2,6 +2,7 @@ import copy
from .core import parse as _parse
from .sources import CompositeDataSource, DataStore
from .sources.filters import Filter
class ObjectFactory(object):
@ -171,3 +172,42 @@ class Environment(object):
return self.get(creator_id)
else:
return None
def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
"""Retrieve Relationships involving the given STIX object.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
List of Relationship objects involving the given STIX object.
"""
results = []
filters = [Filter('type', '=', 'relationship')]
try:
obj_id = obj.get('id', '')
except AttributeError:
obj_id = obj
if relationship_type:
filters.append(Filter('relationship_type', '=', relationship_type))
if source_only and target_only:
raise ValueError("Search either source only or target only, but not both")
if not target_only:
results.extend(self.query(filters + [Filter('source_ref', '=', obj_id)]))
if not source_only:
results.extend(self.query(filters + [Filter('target_ref', '=', obj_id)]))
return results

View File

@ -29,6 +29,13 @@ MARKING_IDS = [
"marking-definition--2802dfb1-1019-40a8-8848-68d0ec0e417f",
]
# All required args for a Campaign instance
CAMPAIGN_KWARGS = dict(
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector.",
)
# All required args for a Campaign instance, plus some optional args
CAMPAIGN_MORE_KWARGS = dict(
type='campaign',

View File

@ -2,8 +2,26 @@ import pytest
import stix2
from .constants import (FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID,
INDICATOR_KWARGS, MALWARE_ID)
from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID,
IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
MALWARE_ID, MALWARE_KWARGS)
RELATIONSHIP_ID1 = 'relationship--06520621-5352-4e6a-b976-e8fa3d437ffd'
RELATIONSHIP_ID2 = 'relationship--181c9c09-43e6-45dd-9374-3bec192f05ef'
RELATIONSHIP_ID3 = 'relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70'
@pytest.fixture
def ds():
cam = stix2.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = stix2.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = stix2.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
mal = stix2.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
rel1 = stix2.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_ID1)
rel2 = stix2.Relationship(mal, 'targets', idy, id=RELATIONSHIP_ID2)
rel3 = stix2.Relationship(cam, 'uses', mal, id=RELATIONSHIP_ID3)
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
yield stix2.MemoryStore(stix_objs)
def test_object_factory_created_by_ref_str():
@ -216,3 +234,56 @@ def test_created_by_not_found():
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
creator = env.creator_of(ind)
assert creator is None
def test_relationships(ds):
env = stix2.Environment(store=ds)
mal = env.get(MALWARE_ID)
resp = env.relationships(mal)
assert len(resp) == 3
assert any(x['id'] == RELATIONSHIP_ID1 for x in resp)
assert any(x['id'] == RELATIONSHIP_ID2 for x in resp)
assert any(x['id'] == RELATIONSHIP_ID3 for x in resp)
def test_relationships_by_type(ds):
env = stix2.Environment(store=ds)
mal = env.get(MALWARE_ID)
resp = env.relationships(mal, relationship_type='indicates')
assert len(resp) == 1
assert resp[0]['id'] == RELATIONSHIP_ID1
def test_relationships_by_source(ds):
env = stix2.Environment(store=ds)
resp = env.relationships(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert resp[0]['id'] == RELATIONSHIP_ID2
def test_relationships_by_target(ds):
env = stix2.Environment(store=ds)
resp = env.relationships(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == RELATIONSHIP_ID1 for x in resp)
assert any(x['id'] == RELATIONSHIP_ID3 for x in resp)
def test_relationships_by_target_and_type(ds):
env = stix2.Environment(store=ds)
resp = env.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
assert len(resp) == 1
assert any(x['id'] == RELATIONSHIP_ID3 for x in resp)
def test_relationships_by_target_and_source(ds):
env = stix2.Environment(store=ds)
with pytest.raises(ValueError) as excinfo:
env.relationships(MALWARE_ID, target_only=True, source_only=True)
assert 'not both' in str(excinfo.value)