Clean up relationships code
parent
92f7e706bf
commit
6446be310c
|
@ -327,8 +327,6 @@ class DataSource(with_metaclass(ABCMeta)):
|
|||
|
||||
try:
|
||||
obj_id = obj['id']
|
||||
except KeyError:
|
||||
raise ValueError("STIX object has no 'id' property")
|
||||
except TypeError:
|
||||
# Assume `obj` is an ID string
|
||||
obj_id = obj
|
||||
|
@ -510,7 +508,7 @@ class CompositeDataSource(DataSource):
|
|||
|
||||
return all_data
|
||||
|
||||
def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
|
||||
def relationships(self, *args, **kwargs):
|
||||
"""Retrieve Relationships involving the given STIX object.
|
||||
|
||||
Only one of `source_only` and `target_only` may be `True`.
|
||||
|
@ -536,31 +534,17 @@ class CompositeDataSource(DataSource):
|
|||
raise AttributeError('CompositeDataSource has no data sources')
|
||||
|
||||
results = []
|
||||
filters = [Filter('type', '=', 'relationship')]
|
||||
|
||||
try:
|
||||
obj_id = obj['id']
|
||||
except KeyError:
|
||||
raise ValueError("STIX object has no 'id' property")
|
||||
except TypeError:
|
||||
# Assume `obj` is an ID string
|
||||
obj_id = obj
|
||||
|
||||
if relationship_type:
|
||||
filters.append(Filter('relationship_type', '=', relationship_type))
|
||||
|
||||
if source_only and target_only:
|
||||
raise ValueError("Search either source only or target only, but not both")
|
||||
|
||||
for ds in self.data_sources:
|
||||
if not target_only:
|
||||
results.extend(ds.query(filters + [Filter('source_ref', '=', obj_id)]))
|
||||
if not source_only:
|
||||
results.extend(ds.query(filters + [Filter('target_ref', '=', obj_id)]))
|
||||
results.extend(ds.relationships(*args, **kwargs))
|
||||
|
||||
# remove exact duplicates (where duplicates are STIX 2.0
|
||||
# objects with the same 'id' and 'modified' values)
|
||||
if len(results) > 0:
|
||||
results = deduplicate(results)
|
||||
|
||||
return results
|
||||
|
||||
def related_to(self, obj, relationship_type=None, source_only=False, target_only=False):
|
||||
def related_to(self, *args, **kwargs):
|
||||
"""Retrieve STIX Objects that have a Relationship involving the given
|
||||
STIX object.
|
||||
|
||||
|
@ -589,28 +573,12 @@ class CompositeDataSource(DataSource):
|
|||
|
||||
results = []
|
||||
for ds in self.data_sources:
|
||||
rels = ds.relationships(obj, relationship_type, source_only, target_only)
|
||||
results.extend(ds.related_to(*args, **kwargs))
|
||||
|
||||
try:
|
||||
obj_id = obj['id']
|
||||
except KeyError:
|
||||
raise ValueError("STIX object has no 'id' property")
|
||||
except TypeError:
|
||||
# Assume `obj` is an ID string
|
||||
obj_id = obj
|
||||
|
||||
for ds in self.data_sources:
|
||||
for r in rels:
|
||||
if not source_only:
|
||||
# relationships() found relationships where target_ref is obj_id
|
||||
source_id = r.source_ref
|
||||
if source_id != obj_id: # needed if target_only is also false
|
||||
results.append(ds.get(source_id))
|
||||
if not target_only:
|
||||
# relationships() found relationships where source_ref is obj_id
|
||||
target_id = r.target_ref
|
||||
if target_id != obj_id: # needed if source_only is also false
|
||||
results.append(ds.get(target_id))
|
||||
# remove exact duplicates (where duplicates are STIX 2.0
|
||||
# objects with the same 'id' and 'modified' values)
|
||||
if len(results) > 0:
|
||||
results = deduplicate(results)
|
||||
|
||||
return results
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ def fs_sink():
|
|||
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@pytest.fixture(scope='module')
|
||||
def rel_fs_store():
|
||||
cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
|
||||
idy = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
|
||||
|
|
Loading…
Reference in New Issue