Move relationships() to DataSources

stix2.0
Chris Lenk 2017-11-15 10:37:17 -05:00
parent f9ad7ceb65
commit 55cf00d7f0
5 changed files with 209 additions and 37 deletions

View File

@ -2,7 +2,6 @@ import copy
from .core import parse as _parse
from .sources import CompositeDataSource, DataStore
from .sources.filters import Filter
class ObjectFactory(object):
@ -173,41 +172,9 @@ class Environment(object):
else:
return None
def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
"""Retrieve Relationships involving the given STIX object.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
List of Relationship objects involving the given STIX object.
"""
results = []
filters = [Filter('type', '=', 'relationship')]
def relationships(self, *args, **kwargs):
try:
obj_id = obj.get('id', '')
return self.source.relationships(*args, **kwargs)
except AttributeError:
obj_id = obj
if relationship_type:
filters.append(Filter('relationship_type', '=', relationship_type))
if source_only and target_only:
raise ValueError("Search either source only or target only, but not both")
if not target_only:
results.extend(self.query(filters + [Filter('source_ref', '=', obj_id)]))
if not source_only:
results.extend(self.query(filters + [Filter('target_ref', '=', obj_id)]))
return results
raise AttributeError('Environment has no data source')
relationships.__doc__ = DataStore.relationships.__doc__

View File

@ -16,6 +16,7 @@ import uuid
from six import with_metaclass
from stix2.sources.filters import Filter
from stix2.utils import deduplicate
@ -89,6 +90,28 @@ class DataStore(object):
"""
return self.source.query(*args, **kwargs)
def relationships(self, *args, **kwargs):
"""Retrieve Relationships involving the given STIX object.
Translate relationships() call to the appropriate DataSource call.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
"""
return self.source.relationships(*args, **kwargs)
def add(self, *args, **kwargs):
"""Method for storing STIX objects.
@ -191,6 +214,28 @@ class DataSource(with_metaclass(ABCMeta)):
"""
@abstractmethod
def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
"""
Implement: The specific data source API calls, processing,
functionality required for dereferencing relationships.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
"""
class CompositeDataSource(DataSource):
"""Controller for all the attached DataSources.
@ -354,6 +399,49 @@ class CompositeDataSource(DataSource):
return all_data
def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
"""Retrieve Relationships involving the given STIX object.
Only one of `source_only` and `target_only` may be `True`.
Federated relationships retrieve method - iterates through all
DataSources defined in "data_sources".
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
"""
results = []
filters = [Filter('type', '=', 'relationship')]
try:
obj_id = obj.get('id', '')
except AttributeError:
obj_id = obj
if relationship_type:
filters.append(Filter('relationship_type', '=', relationship_type))
if source_only and target_only:
raise ValueError("Search either source only or target only, but not both")
for ds in self.data_sources:
if not target_only:
results.extend(ds.query(filters + [Filter('source_ref', '=', obj_id)]))
if not source_only:
results.extend(ds.query(filters + [Filter('target_ref', '=', obj_id)]))
return results
def add_data_source(self, data_source):
"""Attach a DataSource to CompositeDataSource instance

View File

@ -308,6 +308,45 @@ class FileSystemSource(DataSource):
return stix_objs
def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
"""Retrieve Relationships involving the given STIX object.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
"""
results = []
filters = [Filter('type', '=', 'relationship')]
try:
obj_id = obj.get('id', '')
except AttributeError:
obj_id = obj
if relationship_type:
filters.append(Filter('relationship_type', '=', relationship_type))
if source_only and target_only:
raise ValueError("Search either source only or target only, but not both")
if not target_only:
results.extend(self.query(filters + [Filter('source_ref', '=', obj_id)]))
if not source_only:
results.extend(self.query(filters + [Filter('target_ref', '=', obj_id)]))
return results
def _parse_file_filters(self, query):
"""Extract STIX common filters.

View File

@ -301,6 +301,45 @@ class MemorySource(DataSource):
return all_data
def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
"""Retrieve Relationships involving the given STIX object.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
"""
results = []
filters = [Filter('type', '=', 'relationship')]
try:
obj_id = obj.get('id', '')
except AttributeError:
obj_id = obj
if relationship_type:
filters.append(Filter('relationship_type', '=', relationship_type))
if source_only and target_only:
raise ValueError("Search either source only or target only, but not both")
if not target_only:
results.extend(self.query(filters + [Filter('source_ref', '=', obj_id)]))
if not source_only:
results.extend(self.query(filters + [Filter('target_ref', '=', obj_id)]))
return results
def load_from_file(self, file_path, allow_custom=False, version=None):
file_path = os.path.abspath(file_path)
stix_data = json.load(open(file_path, "r"))

View File

@ -222,6 +222,45 @@ class TAXIICollectionSource(DataSource):
return stix_objs
def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
"""Retrieve Relationships involving the given STIX object.
Only one of `source_only` and `target_only` may be `True`.
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
(list): List of Relationship objects involving the given STIX object.
"""
results = []
filters = [Filter('type', '=', 'relationship')]
try:
obj_id = obj.get('id', '')
except AttributeError:
obj_id = obj
if relationship_type:
filters.append(Filter('relationship_type', '=', relationship_type))
if source_only and target_only:
raise ValueError("Search either source only or target only, but not both")
if not target_only:
results.extend(self.query(filters + [Filter('source_ref', '=', obj_id)]))
if not source_only:
results.extend(self.query(filters + [Filter('target_ref', '=', obj_id)]))
return results
def _parse_taxii_filters(self, query):
"""Parse out TAXII filters that the TAXII server can filter on.