From 4dfb5d2365e851aa7bee0cfd60bbbef7fbb6528a Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Fri, 8 Sep 2017 09:01:12 -0400 Subject: [PATCH] Test Environment layer --- stix2/__init__.py | 9 +++- stix2/environment.py | 63 +++++++++++++++++-------- stix2/sources/__init__.py | 9 ++++ stix2/test/test_environment.py | 85 +++++++++++++++++++++++++++++++++- 4 files changed, 144 insertions(+), 22 deletions(-) diff --git a/stix2/__init__.py b/stix2/__init__.py index c2aae2e..35b65b0 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -7,7 +7,7 @@ from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking, ExternalReference, GranularMarking, KillChainPhase, MarkingDefinition, StatementMarking, TLPMarking) from .core import Bundle, _register_type, parse -from .environment import ObjectFactory +from .environment import Environment, ObjectFactory from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, CustomObservable, Directory, DomainName, EmailAddress, EmailMessage, @@ -41,6 +41,13 @@ from .patterns import (AndBooleanExpression, AndObservationExpression, from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, Identity, Indicator, IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, Vulnerability) +from .sources import CompositeDataSource +from .sources.filesystem import (FileSystemSink, FileSystemSource, + FileSystemStore) +from .sources.filters import Filter +from .sources.memory import MemorySink, MemorySource, MemoryStore +from .sources.taxii import (TAXIICollectionSink, TAXIICollectionSource, + TAXIICollectionStore) from .sro import Relationship, Sighting from .utils import get_dict, new_version, revoke from .version import __version__ diff --git a/stix2/environment.py b/stix2/environment.py index 91f0676..93f8ba3 100644 --- a/stix2/environment.py +++ b/stix2/environment.py @@ -80,7 +80,7 @@ class Environment(object): Invalid if `store` is also provided. """ - def __init__(self, factory=None, store=None, source=None, sink=None): + def __init__(self, factory=ObjectFactory(), store=None, source=None, sink=None): self.factory = factory self.source = CompositeDataSource() if store: @@ -93,32 +93,55 @@ class Environment(object): raise ValueError("Data store already provided! Environment may only have one data sink.") self.sink = sink - def create(self, *args, **kwargs): - """Use the object factory to create a STIX object with default property values. - """ - return self.factory.create(*args, **kwargs) + def create(self, *args, **kwargs): + """Use the object factory to create a STIX object with default property values. + """ + return self.factory.create(*args, **kwargs) - def get(self, *args, **kwargs): - """Retrieve the most recent version of a single STIX object by ID. - """ + def get(self, *args, **kwargs): + """Retrieve the most recent version of a single STIX object by ID. + """ + try: return self.source.get(*args, **kwargs) + except AttributeError: + raise AttributeError('Environment has no data source to query') - def all_versions(self, *args, **kwargs): - """Retrieve all versions of a single STIX object by ID. - """ + def all_versions(self, *args, **kwargs): + """Retrieve all versions of a single STIX object by ID. + """ + try: return self.source.all_versions(*args, **kwargs) + except AttributeError: + raise AttributeError('Environment has no data source to query') - def query(self, *args, **kwargs): - """Retrieve STIX objects matching a set of filters. - """ + def query(self, *args, **kwargs): + """Retrieve STIX objects matching a set of filters. + """ + try: return self.source.query(*args, **kwargs) + except AttributeError: + raise AttributeError('Environment has no data source to query') - def add_filter(self, *args, **kwargs): - """Add a filter to be applied to all queries for STIX objects from this environment. - """ + def add_filters(self, *args, **kwargs): + """Add multiple filters to be applied to all queries for STIX objects from this environment. + """ + try: + return self.source.add_filters(*args, **kwargs) + except AttributeError: + raise AttributeError('Environment has no data source') + + def add_filter(self, *args, **kwargs): + """Add a filter to be applied to all queries for STIX objects from this environment. + """ + try: return self.source.add_filter(*args, **kwargs) + except AttributeError: + raise AttributeError('Environment has no data source') - def add(self, *args, **kwargs): - """Store a STIX object. - """ + def add(self, *args, **kwargs): + """Store a STIX object. + """ + try: return self.sink.add(*args, **kwargs) + except AttributeError: + raise AttributeError('Environment has no data sink to put objects in') diff --git a/stix2/sources/__init__.py b/stix2/sources/__init__.py index 08063f5..e76495e 100644 --- a/stix2/sources/__init__.py +++ b/stix2/sources/__init__.py @@ -346,6 +346,9 @@ class CompositeDataSource(DataSource): stix_obj (dict): the STIX object to be returned. """ + if not self.get_all_data_sources(): + raise AttributeError('CompositeDataSource has no data sources') + all_data = [] # for every configured Data Source, call its retrieve handler @@ -382,6 +385,9 @@ class CompositeDataSource(DataSource): all_data (list): list of STIX objects that have the specified id """ + if not self.get_all_data_sources(): + raise AttributeError('CompositeDataSource has no data sources') + all_data = [] all_filters = self.filters @@ -416,6 +422,9 @@ class CompositeDataSource(DataSource): all_data (list): list of STIX objects to be returned """ + if not self.get_all_data_sources(): + raise AttributeError('CompositeDataSource has no data sources') + if not query: query = [] diff --git a/stix2/test/test_environment.py b/stix2/test/test_environment.py index 9be8101..26fd1b4 100644 --- a/stix2/test/test_environment.py +++ b/stix2/test/test_environment.py @@ -1,6 +1,8 @@ +import pytest + import stix2 -from .constants import (FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS, +from .constants import (FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS) @@ -81,3 +83,84 @@ def test_object_factory_list_replace(): ind = factory.create(stix2.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS) assert len(ind.external_references) == 1 assert ind.external_references[0].source_name == "Yet Another Threat Report" + + +def test_environment_functions(): + env = stix2.Environment(stix2.ObjectFactory(created_by_ref=IDENTITY_ID), + stix2.MemoryStore()) + + # Create a STIX object + ind = env.create(stix2.Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS) + assert ind.created_by_ref == IDENTITY_ID + + # Add objects to datastore + ind2 = ind.new_version(labels=['benign']) + env.add([ind, ind2]) + + # Get both versions of the object + resp = env.all_versions(INDICATOR_ID) + assert len(resp) == 1 # should be 2, but MemoryStore only keeps 1 version of objects + + # Get just the most recent version of the object + resp = env.get(INDICATOR_ID) + assert resp['labels'][0] == 'benign' + + # Search on something other than id + query = [stix2.Filter('type', '=', 'vulnerability')] + resp = env.query(query) + assert len(resp) == 0 + + # See different results after adding filters to the environment + env.add_filters([stix2.Filter('type', '=', 'indicator'), + stix2.Filter('created_by_ref', '=', IDENTITY_ID)]) + env.add_filter(stix2.Filter('labels', '=', 'benign')) # should be 'malicious-activity' + resp = env.get(INDICATOR_ID) + assert resp['labels'][0] == 'benign' # should be 'malicious-activity' + + +def test_environment_source_and_sink(): + ind = stix2.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) + env = stix2.Environment(source=stix2.MemorySource([ind]), sink=stix2.MemorySink([ind])) + assert env.get(INDICATOR_ID).labels[0] == 'malicious-activity' + + +def test_environment_datastore_and_sink(): + with pytest.raises(ValueError) as excinfo: + stix2.Environment(factory=stix2.ObjectFactory(), + store=stix2.MemoryStore(), sink=stix2.MemorySink) + assert 'Data store already provided' in str(excinfo.value) + + +def test_environment_no_datastore(): + env = stix2.Environment(factory=stix2.ObjectFactory()) + + with pytest.raises(AttributeError) as excinfo: + env.add(stix2.Indicator(**INDICATOR_KWARGS)) + assert 'Environment has no data sink to put objects in' in str(excinfo.value) + + with pytest.raises(AttributeError) as excinfo: + env.get(INDICATOR_ID) + assert 'Environment has no data source' in str(excinfo.value) + + with pytest.raises(AttributeError) as excinfo: + env.all_versions(INDICATOR_ID) + assert 'Environment has no data source' in str(excinfo.value) + + with pytest.raises(AttributeError) as excinfo: + env.query(INDICATOR_ID) + assert 'Environment has no data source' in str(excinfo.value) + + with pytest.raises(AttributeError) as excinfo: + env.add_filters(INDICATOR_ID) + assert 'Environment has no data source' in str(excinfo.value) + + with pytest.raises(AttributeError) as excinfo: + env.add_filter(INDICATOR_ID) + assert 'Environment has no data source' in str(excinfo.value) + + +def test_environment_datastore_and_no_object_factory(): + # Uses a default object factory + env = stix2.Environment(store=stix2.MemoryStore()) + ind = env.create(stix2.Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS) + assert ind.id == INDICATOR_ID