diff --git a/.travis.yml b/.travis.yml
index aba764d..0d5a046 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,7 +3,6 @@ language: python
cache: pip
python:
- "2.7"
- - "3.3"
- "3.4"
- "3.5"
- "3.5-dev"
@@ -16,6 +15,6 @@ install:
- pip install codecov
script:
- tox
- - if [[ $TRAVIS_PYTHON_VERSION != 2.6 ]]; then pre-commit run --all-files; fi
+ - pre-commit run --all-files
after_success:
- codecov
diff --git a/docs/guide/datastore.ipynb b/docs/guide/datastore.ipynb
index 49ea4fd..866b432 100644
--- a/docs/guide/datastore.ipynb
+++ b/docs/guide/datastore.ipynb
@@ -23,7 +23,7 @@
},
{
"cell_type": "code",
- "execution_count": 2,
+ "execution_count": 40,
"metadata": {
"collapsed": true,
"nbsphinx": "hidden"
@@ -454,6 +454,277 @@
"# attach multiple filters to a MemoryStore\n",
"mem.source.filters.update([f1,f2])"
]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## De-Referencing Relationships\n",
+ "\n",
+ "Given a STIX object, there are several ways to find other STIX objects related to it. To illustrate this, let's first create a [DataStore](../api/stix2.sources.rst#stix2.sources.DataStore) and add some objects and relationships."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from stix2 import Campaign, Identity, Indicator, Malware, Relationship\n",
+ "\n",
+ "mem = MemoryStore()\n",
+ "cam = Campaign(name='Charge', description='Attack!')\n",
+ "idy = Identity(name='John Doe', identity_class=\"individual\")\n",
+ "ind = Indicator(labels=['malicious-activity'], pattern=\"[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']\")\n",
+ "mal = Malware(labels=['ransomware'], name=\"Cryptolocker\", created_by_ref=idy)\n",
+ "rel1 = Relationship(ind, 'indicates', mal,)\n",
+ "rel2 = Relationship(mal, 'targets', idy)\n",
+ "rel3 = Relationship(cam, 'uses', mal)\n",
+ "mem.add([cam, idy, ind, mal, rel1, rel2, rel3])"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "If a STIX object has a `created_by_ref` property, you can use the [creator_of()](../api/stix2.sources.rst#stix2.sources.DataSource.creator_of) method to retrieve the [Identity](../api/stix2.v20.sdo.rst#stix2.v20.sdo.Identity) object that created it."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "
{\n",
+ " "type": "identity",\n",
+ " "id": "identity--be3baac0-9aba-48a8-81e4-4408b1c379a8",\n",
+ " "created": "2017-11-21T22:14:45.213Z",\n",
+ " "modified": "2017-11-21T22:14:45.213Z",\n",
+ " "name": "John Doe",\n",
+ " "identity_class": "individual"\n",
+ "}\n",
+ "
\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 14,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "print(mem.creator_of(mal))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Use the [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) method to retrieve all the relationship objects that reference a STIX object."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "3"
+ ]
+ },
+ "execution_count": 15,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "rels = mem.relationships(mal)\n",
+ "len(rels)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "You can limit it to only specific relationship types:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 27,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Relationship(type='relationship', id='relationship--bd6fd399-c907-4feb-b1da-b90f15942f1d', created='2017-11-21T22:14:45.214Z', modified='2017-11-21T22:14:45.214Z', relationship_type=u'indicates', source_ref='indicator--5ee33ff0-c50d-456b-a8dd-b5d1b69a66e8', target_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4')]"
+ ]
+ },
+ "execution_count": 27,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "mem.relationships(mal, relationship_type='indicates')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "You can limit it to only relationships where the given object is the source:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 28,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Relationship(type='relationship', id='relationship--7eb7f5cd-8bf2-4f7c-8756-84c0b5693b9a', created='2017-11-21T22:14:45.215Z', modified='2017-11-21T22:14:45.215Z', relationship_type=u'targets', source_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4', target_ref='identity--be3baac0-9aba-48a8-81e4-4408b1c379a8')]"
+ ]
+ },
+ "execution_count": 28,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "mem.relationships(mal, source_only=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "And you can limit it to only relationships where the given object is the target:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 30,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Relationship(type='relationship', id='relationship--bd6fd399-c907-4feb-b1da-b90f15942f1d', created='2017-11-21T22:14:45.214Z', modified='2017-11-21T22:14:45.214Z', relationship_type=u'indicates', source_ref='indicator--5ee33ff0-c50d-456b-a8dd-b5d1b69a66e8', target_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4'),\n",
+ " Relationship(type='relationship', id='relationship--3c759d40-c92a-430e-aab6-77d5c5763302', created='2017-11-21T22:14:45.215Z', modified='2017-11-21T22:14:45.215Z', relationship_type=u'uses', source_ref='campaign--82ab7aa4-d13b-4e99-8a09-ebcba30668a7', target_ref='malware--66c0bc78-4e27-4d80-a565-a07e6eb6fba4')]"
+ ]
+ },
+ "execution_count": 30,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "mem.relationships(mal, target_only=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Finally, you can retrieve all STIX objects related to a given STIX object using [related_to()](../api/stix2.sources.rst#stix2.sources.DataSource.related_to). This calls [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) but then performs the extra step of getting the objects that these Relationships point to. [related_to()](../api/stix2.sources.rst#stix2.sources.DataSource.related_to) takes all the same arguments that [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) does."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 42,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Campaign(type='campaign', id='campaign--82ab7aa4-d13b-4e99-8a09-ebcba30668a7', created='2017-11-21T22:14:45.213Z', modified='2017-11-21T22:14:45.213Z', name=u'Charge', description=u'Attack!')]"
+ ]
+ },
+ "execution_count": 42,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "mem.related_to(mal, target_only=True, relationship_type='uses')"
+ ]
}
],
"metadata": {
diff --git a/docs/guide/environment.ipynb b/docs/guide/environment.ipynb
index 2d85911..0cb5796 100644
--- a/docs/guide/environment.ipynb
+++ b/docs/guide/environment.ipynb
@@ -128,7 +128,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "You can retrieve STIX objects from the [DataSources](../api/stix2.sources.rst#stix2.sources.DataSource) in the [Environment](../api/stix2.environment.rst#stix2.environment.Environment) with [get()](../api/stix2.environment.rst#stix2.environment.Environment.get), [query()](../api/stix2.environment.rst#stix2.environment.Environment.query), and [all_versions()](../api/stix2.environment.rst#stix2.environment.Environment.all_versions), just as you would for a [DataSource](../api/stix2.sources.rst#stix2.sources.DataSource)."
+ "You can retrieve STIX objects from the [DataSources](../api/stix2.sources.rst#stix2.sources.DataSource) in the [Environment](../api/stix2.environment.rst#stix2.environment.Environment) with [get()](../api/stix2.environment.rst#stix2.environment.Environment.get), [query()](../api/stix2.environment.rst#stix2.environment.Environment.query), [all_versions()](../api/stix2.environment.rst#stix2.environment.Environment.all_versions), [creator_of()](../api/stix2.sources.rst#stix2.sources.DataSource.creator_of), [related_to()](../api/stix2.sources.rst#stix2.sources.DataSource.related_to), and [relationships()](../api/stix2.sources.rst#stix2.sources.DataSource.relationships) just as you would for a [DataSource](../api/stix2.sources.rst#stix2.sources.DataSource)."
]
},
{
diff --git a/setup.py b/setup.py
index 72bc5d7..fa68616 100644
--- a/setup.py
+++ b/setup.py
@@ -39,7 +39,6 @@ setup(
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
- 'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
diff --git a/stix2/environment.py b/stix2/environment.py
index 64a73b1..ab16b09 100644
--- a/stix2/environment.py
+++ b/stix2/environment.py
@@ -105,30 +105,13 @@ class Environment(object):
return self.factory.create(*args, **kwargs)
create.__doc__ = ObjectFactory.create.__doc__
- def get(self, *args, **kwargs):
- try:
- return self.source.get(*args, **kwargs)
- except AttributeError:
- raise AttributeError('Environment has no data source to query')
- get.__doc__ = DataStore.get.__doc__
-
- def all_versions(self, *args, **kwargs):
- """Retrieve all versions of a single STIX object by ID.
- """
- try:
- return self.source.all_versions(*args, **kwargs)
- except AttributeError:
- raise AttributeError('Environment has no data source to query')
- all_versions.__doc__ = DataStore.all_versions.__doc__
-
- def query(self, *args, **kwargs):
- """Retrieve STIX objects matching a set of filters.
- """
- try:
- return self.source.query(*args, **kwargs)
- except AttributeError:
- raise AttributeError('Environment has no data source to query')
- query.__doc__ = DataStore.query.__doc__
+ get = DataStore.__dict__['get']
+ all_versions = DataStore.__dict__['all_versions']
+ query = DataStore.__dict__['query']
+ creator_of = DataStore.__dict__['creator_of']
+ relationships = DataStore.__dict__['relationships']
+ related_to = DataStore.__dict__['related_to']
+ add = DataStore.__dict__['add']
def add_filters(self, *args, **kwargs):
try:
@@ -142,31 +125,6 @@ class Environment(object):
except AttributeError:
raise AttributeError('Environment has no data source')
- def add(self, *args, **kwargs):
- try:
- return self.sink.add(*args, **kwargs)
- except AttributeError:
- raise AttributeError('Environment has no data sink to put objects in')
- add.__doc__ = DataStore.add.__doc__
-
def parse(self, *args, **kwargs):
return _parse(*args, **kwargs)
parse.__doc__ = _parse.__doc__
-
- def creator_of(self, obj):
- """Retrieve the Identity refered to by the object's `created_by_ref`.
-
- Args:
- obj: The STIX object whose `created_by_ref` property will be looked
- up.
-
- Returns:
- The STIX object's creator, or None, if the object contains no
- `created_by_ref` property or the object's creator cannot be found.
-
- """
- creator_id = obj.get('created_by_ref', '')
- if creator_id:
- return self.get(creator_id)
- else:
- return None
diff --git a/stix2/sources/__init__.py b/stix2/sources/__init__.py
index b3e8a29..adc6def 100644
--- a/stix2/sources/__init__.py
+++ b/stix2/sources/__init__.py
@@ -16,6 +16,7 @@ import uuid
from six import with_metaclass
+from stix2.sources.filters import Filter
from stix2.utils import deduplicate
@@ -58,7 +59,10 @@ class DataStore(object):
object specified by the "id".
"""
- return self.source.get(*args, **kwargs)
+ try:
+ return self.source.get(*args, **kwargs)
+ except AttributeError:
+ raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def all_versions(self, *args, **kwargs):
"""Retrieve all versions of a single STIX object by ID.
@@ -72,7 +76,10 @@ class DataStore(object):
stix_objs (list): a list of STIX objects
"""
- return self.source.all_versions(*args, **kwargs)
+ try:
+ return self.source.all_versions(*args, **kwargs)
+ except AttributeError:
+ raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def query(self, *args, **kwargs):
"""Retrieve STIX objects matching a set of filters.
@@ -87,7 +94,83 @@ class DataStore(object):
stix_objs (list): a list of STIX objects
"""
- return self.source.query(*args, **kwargs)
+ try:
+ return self.source.query(*args, **kwargs)
+ except AttributeError:
+ raise AttributeError('%s has no data source to query' % self.__class__.__name__)
+
+ def creator_of(self, *args, **kwargs):
+ """Retrieve the Identity refered to by the object's `created_by_ref`.
+
+ Translate creator_of() call to the appropriate DataSource call.
+
+ Args:
+ obj: The STIX object whose `created_by_ref` property will be looked
+ up.
+
+ Returns:
+ The STIX object's creator, or None, if the object contains no
+ `created_by_ref` property or the object's creator cannot be found.
+
+ """
+ try:
+ return self.source.creator_of(*args, **kwargs)
+ except AttributeError:
+ raise AttributeError('%s has no data source to query' % self.__class__.__name__)
+
+ def relationships(self, *args, **kwargs):
+ """Retrieve Relationships involving the given STIX object.
+
+ Translate relationships() call to the appropriate DataSource call.
+
+ Only one of `source_only` and `target_only` may be `True`.
+
+ Args:
+ obj (STIX object OR dict OR str): The STIX object (or its ID) whose
+ relationships will be looked up.
+ relationship_type (str): Only retrieve Relationships of this type.
+ If None, all relationships will be returned, regardless of type.
+ source_only (bool): Only retrieve Relationships for which this
+ object is the source_ref. Default: False.
+ target_only (bool): Only retrieve Relationships for which this
+ object is the target_ref. Default: False.
+
+ Returns:
+ (list): List of Relationship objects involving the given STIX object.
+
+ """
+ try:
+ return self.source.relationships(*args, **kwargs)
+ except AttributeError:
+ raise AttributeError('%s has no data source to query' % self.__class__.__name__)
+
+ def related_to(self, *args, **kwargs):
+ """Retrieve STIX Objects that have a Relationship involving the given
+ STIX object.
+
+ Translate related_to() call to the appropriate DataSource call.
+
+ Only one of `source_only` and `target_only` may be `True`.
+
+ Args:
+ obj (STIX object OR dict OR str): The STIX object (or its ID) whose
+ related objects will be looked up.
+ relationship_type (str): Only retrieve objects related by this
+ Relationships type. If None, all related objects will be
+ returned, regardless of type.
+ source_only (bool): Only examine Relationships for which this
+ object is the source_ref. Default: False.
+ target_only (bool): Only examine Relationships for which this
+ object is the target_ref. Default: False.
+
+ Returns:
+ (list): List of STIX objects related to the given STIX object.
+
+ """
+ try:
+ return self.source.related_to(*args, **kwargs)
+ except AttributeError:
+ raise AttributeError('%s has no data source to query' % self.__class__.__name__)
def add(self, *args, **kwargs):
"""Method for storing STIX objects.
@@ -99,7 +182,10 @@ class DataStore(object):
stix_objs (list): a list of STIX objects
"""
- return self.sink.add(*args, **kwargs)
+ try:
+ return self.sink.add(*args, **kwargs)
+ except AttributeError:
+ raise AttributeError('%s has no data sink to put objects in' % self.__class__.__name__)
class DataSink(with_metaclass(ABCMeta)):
@@ -191,6 +277,108 @@ class DataSource(with_metaclass(ABCMeta)):
"""
+ def creator_of(self, obj):
+ """Retrieve the Identity refered to by the object's `created_by_ref`.
+
+ Args:
+ obj: The STIX object whose `created_by_ref` property will be looked
+ up.
+
+ Returns:
+ The STIX object's creator, or None, if the object contains no
+ `created_by_ref` property or the object's creator cannot be found.
+
+ """
+ creator_id = obj.get('created_by_ref', '')
+ if creator_id:
+ return self.get(creator_id)
+ else:
+ return None
+
+ def relationships(self, obj, relationship_type=None, source_only=False, target_only=False):
+ """Retrieve Relationships involving the given STIX object.
+
+ Only one of `source_only` and `target_only` may be `True`.
+
+ Args:
+ obj (STIX object OR dict OR str): The STIX object (or its ID) whose
+ relationships will be looked up.
+ relationship_type (str): Only retrieve Relationships of this type.
+ If None, all relationships will be returned, regardless of type.
+ source_only (bool): Only retrieve Relationships for which this
+ object is the source_ref. Default: False.
+ target_only (bool): Only retrieve Relationships for which this
+ object is the target_ref. Default: False.
+
+ Returns:
+ (list): List of Relationship objects involving the given STIX object.
+
+ """
+ results = []
+ filters = [Filter('type', '=', 'relationship')]
+
+ try:
+ obj_id = obj['id']
+ except KeyError:
+ raise ValueError("STIX object has no 'id' property")
+ except TypeError:
+ # Assume `obj` is an ID string
+ obj_id = obj
+
+ if relationship_type:
+ filters.append(Filter('relationship_type', '=', relationship_type))
+
+ if source_only and target_only:
+ raise ValueError("Search either source only or target only, but not both")
+
+ if not target_only:
+ results.extend(self.query(filters + [Filter('source_ref', '=', obj_id)]))
+ if not source_only:
+ results.extend(self.query(filters + [Filter('target_ref', '=', obj_id)]))
+
+ return results
+
+ def related_to(self, obj, relationship_type=None, source_only=False, target_only=False):
+ """Retrieve STIX Objects that have a Relationship involving the given
+ STIX object.
+
+ Only one of `source_only` and `target_only` may be `True`.
+
+ Args:
+ obj (STIX object OR dict OR str): The STIX object (or its ID) whose
+ related objects will be looked up.
+ relationship_type (str): Only retrieve objects related by this
+ Relationships type. If None, all related objects will be
+ returned, regardless of type.
+ source_only (bool): Only examine Relationships for which this
+ object is the source_ref. Default: False.
+ target_only (bool): Only examine Relationships for which this
+ object is the target_ref. Default: False.
+
+ Returns:
+ (list): List of STIX objects related to the given STIX object.
+
+ """
+ results = []
+ rels = self.relationships(obj, relationship_type, source_only, target_only)
+
+ try:
+ obj_id = obj['id']
+ except TypeError:
+ # Assume `obj` is an ID string
+ obj_id = obj
+
+ # Get all unique ids from the relationships except that of the object
+ ids = set()
+ for r in rels:
+ ids.update((r.source_ref, r.target_ref))
+ ids.remove(obj_id)
+
+ for i in ids:
+ results.append(self.get(i))
+
+ return results
+
class CompositeDataSource(DataSource):
"""Controller for all the attached DataSources.
@@ -354,6 +542,80 @@ class CompositeDataSource(DataSource):
return all_data
+ def relationships(self, *args, **kwargs):
+ """Retrieve Relationships involving the given STIX object.
+
+ Only one of `source_only` and `target_only` may be `True`.
+
+ Federated relationships retrieve method - iterates through all
+ DataSources defined in "data_sources".
+
+ Args:
+ obj (STIX object OR dict OR str): The STIX object (or its ID) whose
+ relationships will be looked up.
+ relationship_type (str): Only retrieve Relationships of this type.
+ If None, all relationships will be returned, regardless of type.
+ source_only (bool): Only retrieve Relationships for which this
+ object is the source_ref. Default: False.
+ target_only (bool): Only retrieve Relationships for which this
+ object is the target_ref. Default: False.
+
+ Returns:
+ (list): List of Relationship objects involving the given STIX object.
+
+ """
+ if not self.has_data_sources():
+ raise AttributeError('CompositeDataSource has no data sources')
+
+ results = []
+ for ds in self.data_sources:
+ results.extend(ds.relationships(*args, **kwargs))
+
+ # remove exact duplicates (where duplicates are STIX 2.0
+ # objects with the same 'id' and 'modified' values)
+ if len(results) > 0:
+ results = deduplicate(results)
+
+ return results
+
+ def related_to(self, *args, **kwargs):
+ """Retrieve STIX Objects that have a Relationship involving the given
+ STIX object.
+
+ Only one of `source_only` and `target_only` may be `True`.
+
+ Federated related objects method - iterates through all
+ DataSources defined in "data_sources".
+
+ Args:
+ obj (STIX object OR dict OR str): The STIX object (or its ID) whose
+ related objects will be looked up.
+ relationship_type (str): Only retrieve objects related by this
+ Relationships type. If None, all related objects will be
+ returned, regardless of type.
+ source_only (bool): Only examine Relationships for which this
+ object is the source_ref. Default: False.
+ target_only (bool): Only examine Relationships for which this
+ object is the target_ref. Default: False.
+
+ Returns:
+ (list): List of STIX objects related to the given STIX object.
+
+ """
+ if not self.has_data_sources():
+ raise AttributeError('CompositeDataSource has no data sources')
+
+ results = []
+ for ds in self.data_sources:
+ results.extend(ds.related_to(*args, **kwargs))
+
+ # remove exact duplicates (where duplicates are STIX 2.0
+ # objects with the same 'id' and 'modified' values)
+ if len(results) > 0:
+ results = deduplicate(results)
+
+ return results
+
def add_data_source(self, data_source):
"""Attach a DataSource to CompositeDataSource instance
diff --git a/stix2/sources/filters.py b/stix2/sources/filters.py
index 5772112..5af48cd 100644
--- a/stix2/sources/filters.py
+++ b/stix2/sources/filters.py
@@ -10,6 +10,11 @@ FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
"""Supported filter value types"""
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
+try:
+ FILTER_VALUE_TYPES.append(unicode)
+except NameError:
+ # Python 3 doesn't need to worry about unicode
+ pass
def _check_filter_components(prop, op, value):
diff --git a/stix2/test/constants.py b/stix2/test/constants.py
index 839b547..3db39d6 100644
--- a/stix2/test/constants.py
+++ b/stix2/test/constants.py
@@ -28,6 +28,18 @@ MARKING_IDS = [
"marking-definition--68520ae2-fefe-43a9-84ee-2c2a934d2c7d",
"marking-definition--2802dfb1-1019-40a8-8848-68d0ec0e417f",
]
+RELATIONSHIP_IDS = [
+ 'relationship--06520621-5352-4e6a-b976-e8fa3d437ffd',
+ 'relationship--181c9c09-43e6-45dd-9374-3bec192f05ef',
+ 'relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70'
+]
+
+# All required args for a Campaign instance
+CAMPAIGN_KWARGS = dict(
+ name="Green Group Attacks Against Finance",
+ description="Campaign by Green Group against a series of targets in the financial services sector.",
+)
+
# All required args for a Campaign instance, plus some optional args
CAMPAIGN_MORE_KWARGS = dict(
diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py
index 92d5d4c..7c1832b 100644
--- a/stix2/test/test_custom.py
+++ b/stix2/test/test_custom.py
@@ -94,6 +94,28 @@ def test_custom_property_in_bundled_object():
assert '"x_foo": "bar"' in str(bundle)
+def test_custom_marking_no_init_1():
+ @stix2.CustomMarking('x-new-obj', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ])
+ class NewObj():
+ pass
+
+ no = NewObj(property1='something')
+ assert no.property1 == 'something'
+
+
+def test_custom_marking_no_init_2():
+ @stix2.CustomMarking('x-new-obj2', [
+ ('property1', stix2.properties.StringProperty(required=True)),
+ ])
+ class NewObj2(object):
+ pass
+
+ no2 = NewObj2(property1='something')
+ assert no2.property1 == 'something'
+
+
@stix2.sdo.CustomObject('x-new-type', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
@@ -102,6 +124,15 @@ class NewType(object):
def __init__(self, property2=None, **kwargs):
if property2 and property2 < 10:
raise ValueError("'property2' is too small.")
+ if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
+ raise TypeError("Must be integer!")
+
+
+def test_custom_object_raises_exception():
+ with pytest.raises(TypeError) as excinfo:
+ NewType(property1='something', property3='something', allow_custom=True)
+
+ assert str(excinfo.value) == "Must be integer!"
def test_custom_object_type():
@@ -117,7 +148,7 @@ def test_custom_object_type():
assert "'property2' is too small." in str(excinfo.value)
-def test_custom_object_no_init():
+def test_custom_object_no_init_1():
@stix2.sdo.CustomObject('x-new-obj', [
('property1', stix2.properties.StringProperty(required=True)),
])
@@ -127,6 +158,8 @@ def test_custom_object_no_init():
no = NewObj(property1='something')
assert no.property1 == 'something'
+
+def test_custom_object_no_init_2():
@stix2.sdo.CustomObject('x-new-obj2', [
('property1', stix2.properties.StringProperty(required=True)),
])
@@ -170,23 +203,36 @@ class NewObservable():
def __init__(self, property2=None, **kwargs):
if property2 and property2 < 10:
raise ValueError("'property2' is too small.")
+ if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
+ raise TypeError("Must be integer!")
-def test_custom_observable_object():
+def test_custom_observable_object_1():
no = NewObservable(property1='something')
assert no.property1 == 'something'
+
+def test_custom_observable_object_2():
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
NewObservable(property2=42)
assert excinfo.value.properties == ['property1']
assert "No values for required properties" in str(excinfo.value)
+
+def test_custom_observable_object_3():
with pytest.raises(ValueError) as excinfo:
NewObservable(property1='something', property2=4)
assert "'property2' is too small." in str(excinfo.value)
-def test_custom_observable_object_no_init():
+def test_custom_observable_raises_exception():
+ with pytest.raises(TypeError) as excinfo:
+ NewObservable(property1='something', property3='something', allow_custom=True)
+
+ assert str(excinfo.value) == "Must be integer!"
+
+
+def test_custom_observable_object_no_init_1():
@stix2.observables.CustomObservable('x-new-observable', [
('property1', stix2.properties.StringProperty()),
])
@@ -196,6 +242,8 @@ def test_custom_observable_object_no_init():
no = NewObs(property1='something')
assert no.property1 == 'something'
+
+def test_custom_observable_object_no_init_2():
@stix2.observables.CustomObservable('x-new-obs2', [
('property1', stix2.properties.StringProperty()),
])
@@ -354,6 +402,15 @@ class NewExtension():
def __init__(self, property2=None, **kwargs):
if property2 and property2 < 10:
raise ValueError("'property2' is too small.")
+ if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
+ raise TypeError("Must be integer!")
+
+
+def test_custom_extension_raises_exception():
+ with pytest.raises(TypeError) as excinfo:
+ NewExtension(property1='something', property3='something', allow_custom=True)
+
+ assert str(excinfo.value) == "Must be integer!"
def test_custom_extension():
@@ -433,7 +490,7 @@ def test_custom_extension_empty_properties():
assert "'properties' must be a dict!" in str(excinfo.value)
-def test_custom_extension_no_init():
+def test_custom_extension_no_init_1():
@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-extension', {
'property1': stix2.properties.StringProperty(required=True),
})
@@ -443,6 +500,8 @@ def test_custom_extension_no_init():
ne = NewExt(property1="foobar")
assert ne.property1 == "foobar"
+
+def test_custom_extension_no_init_2():
@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-ext2', {
'property1': stix2.properties.StringProperty(required=True),
})
diff --git a/stix2/test/test_data_sources.py b/stix2/test/test_data_sources.py
index ef0cf26..d7f238a 100644
--- a/stix2/test/test_data_sources.py
+++ b/stix2/test/test_data_sources.py
@@ -547,3 +547,11 @@ def test_composite_datasource_operations():
# nothing returns the same as cds1.query(query1) (the associated query is query2)
results = cds1.query([])
assert len(results) == 3
+
+
+def test_composite_datastore_no_datasource():
+ cds = CompositeDataSource()
+
+ with pytest.raises(AttributeError) as excinfo:
+ cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
+ assert 'CompositeDataSource has no data source' in str(excinfo.value)
diff --git a/stix2/test/test_environment.py b/stix2/test/test_environment.py
index c669a33..84ca803 100644
--- a/stix2/test/test_environment.py
+++ b/stix2/test/test_environment.py
@@ -2,8 +2,22 @@ import pytest
import stix2
-from .constants import (FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID,
- INDICATOR_KWARGS, MALWARE_ID)
+from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID,
+ IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
+ MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS)
+
+
+@pytest.fixture
+def ds():
+ cam = stix2.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
+ idy = stix2.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
+ ind = stix2.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
+ mal = stix2.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
+ rel1 = stix2.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
+ rel2 = stix2.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
+ rel3 = stix2.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
+ stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
+ yield stix2.MemoryStore(stix_objs)
def test_object_factory_created_by_ref_str():
@@ -150,6 +164,14 @@ def test_environment_no_datastore():
env.query(INDICATOR_ID)
assert 'Environment has no data source' in str(excinfo.value)
+ with pytest.raises(AttributeError) as excinfo:
+ env.relationships(INDICATOR_ID)
+ assert 'Environment has no data source' in str(excinfo.value)
+
+ with pytest.raises(AttributeError) as excinfo:
+ env.related_to(INDICATOR_ID)
+ assert 'Environment has no data source' in str(excinfo.value)
+
def test_environment_add_filters():
env = stix2.Environment(factory=stix2.ObjectFactory())
@@ -186,7 +208,7 @@ def test_parse_malware():
assert mal.name == "Cryptolocker"
-def test_created_by():
+def test_creator_of():
identity = stix2.Identity(**IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=identity.id)
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
@@ -197,7 +219,7 @@ def test_created_by():
assert creator is identity
-def test_created_by_no_datasource():
+def test_creator_of_no_datasource():
identity = stix2.Identity(**IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=identity.id)
env = stix2.Environment(factory=factory)
@@ -208,7 +230,7 @@ def test_created_by_no_datasource():
assert 'Environment has no data source' in str(excinfo.value)
-def test_created_by_not_found():
+def test_creator_of_not_found():
identity = stix2.Identity(**IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=identity.id)
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
@@ -216,3 +238,113 @@ def test_created_by_not_found():
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
creator = env.creator_of(ind)
assert creator is None
+
+
+def test_creator_of_no_created_by_ref():
+ env = stix2.Environment(store=stix2.MemoryStore())
+ ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
+ creator = env.creator_of(ind)
+ assert creator is None
+
+
+def test_relationships(ds):
+ env = stix2.Environment(store=ds)
+ mal = env.get(MALWARE_ID)
+ resp = env.relationships(mal)
+
+ assert len(resp) == 3
+ assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
+ assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
+ assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
+
+
+def test_relationships_no_id(ds):
+ env = stix2.Environment(store=ds)
+ mal = {
+ "type": "malware",
+ "name": "some variant"
+ }
+ with pytest.raises(ValueError) as excinfo:
+ env.relationships(mal)
+ assert "object has no 'id' property" in str(excinfo.value)
+
+
+def test_relationships_by_type(ds):
+ env = stix2.Environment(store=ds)
+ mal = env.get(MALWARE_ID)
+ resp = env.relationships(mal, relationship_type='indicates')
+
+ assert len(resp) == 1
+ assert resp[0]['id'] == RELATIONSHIP_IDS[0]
+
+
+def test_relationships_by_source(ds):
+ env = stix2.Environment(store=ds)
+ resp = env.relationships(MALWARE_ID, source_only=True)
+
+ assert len(resp) == 1
+ assert resp[0]['id'] == RELATIONSHIP_IDS[1]
+
+
+def test_relationships_by_target(ds):
+ env = stix2.Environment(store=ds)
+ resp = env.relationships(MALWARE_ID, target_only=True)
+
+ assert len(resp) == 2
+ assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
+ assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
+
+
+def test_relationships_by_target_and_type(ds):
+ env = stix2.Environment(store=ds)
+ resp = env.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
+
+ assert len(resp) == 1
+ assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
+
+
+def test_relationships_by_target_and_source(ds):
+ env = stix2.Environment(store=ds)
+ with pytest.raises(ValueError) as excinfo:
+ env.relationships(MALWARE_ID, target_only=True, source_only=True)
+
+ assert 'not both' in str(excinfo.value)
+
+
+def test_related_to(ds):
+ env = stix2.Environment(store=ds)
+ mal = env.get(MALWARE_ID)
+ resp = env.related_to(mal)
+
+ assert len(resp) == 3
+ assert any(x['id'] == CAMPAIGN_ID for x in resp)
+ assert any(x['id'] == INDICATOR_ID for x in resp)
+ assert any(x['id'] == IDENTITY_ID for x in resp)
+
+
+def test_related_to_no_id(ds):
+ env = stix2.Environment(store=ds)
+ mal = {
+ "type": "malware",
+ "name": "some variant"
+ }
+ with pytest.raises(ValueError) as excinfo:
+ env.related_to(mal)
+ assert "object has no 'id' property" in str(excinfo.value)
+
+
+def test_related_to_by_source(ds):
+ env = stix2.Environment(store=ds)
+ resp = env.related_to(MALWARE_ID, source_only=True)
+
+ assert len(resp) == 1
+ assert resp[0]['id'] == IDENTITY_ID
+
+
+def test_related_to_by_target(ds):
+ env = stix2.Environment(store=ds)
+ resp = env.related_to(MALWARE_ID, target_only=True)
+
+ assert len(resp) == 2
+ assert any(x['id'] == CAMPAIGN_ID for x in resp)
+ assert any(x['id'] == INDICATOR_ID for x in resp)
diff --git a/stix2/test/test_filesystem.py b/stix2/test/test_filesystem.py
index 729285a..020fee5 100644
--- a/stix2/test/test_filesystem.py
+++ b/stix2/test/test_filesystem.py
@@ -4,7 +4,12 @@ import shutil
import pytest
from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
- FileSystemSource, FileSystemStore, Filter, properties)
+ FileSystemSource, FileSystemStore, Filter, Identity,
+ Indicator, Malware, Relationship, properties)
+
+from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
+ IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
+ MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS)
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
@@ -40,6 +45,25 @@ def fs_sink():
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
+@pytest.fixture(scope='module')
+def rel_fs_store():
+ cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
+ idy = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
+ ind = Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
+ mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
+ rel1 = Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
+ rel2 = Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
+ rel3 = Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
+ stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
+ fs = FileSystemStore(FS_PATH)
+ for o in stix_objs:
+ fs.add(o)
+ yield fs
+
+ for o in stix_objs:
+ os.remove(os.path.join(FS_PATH, o.type, o.id + '.json'))
+
+
def test_filesystem_source_nonexistent_folder():
with pytest.raises(ValueError) as excinfo:
FileSystemSource('nonexistent-folder')
@@ -375,3 +399,75 @@ def test_filesystem_custom_object(fs_store):
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True)
+
+
+def test_relationships(rel_fs_store):
+ mal = rel_fs_store.get(MALWARE_ID)
+ resp = rel_fs_store.relationships(mal)
+
+ assert len(resp) == 3
+ assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
+ assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
+ assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
+
+
+def test_relationships_by_type(rel_fs_store):
+ mal = rel_fs_store.get(MALWARE_ID)
+ resp = rel_fs_store.relationships(mal, relationship_type='indicates')
+
+ assert len(resp) == 1
+ assert resp[0]['id'] == RELATIONSHIP_IDS[0]
+
+
+def test_relationships_by_source(rel_fs_store):
+ resp = rel_fs_store.relationships(MALWARE_ID, source_only=True)
+
+ assert len(resp) == 1
+ assert resp[0]['id'] == RELATIONSHIP_IDS[1]
+
+
+def test_relationships_by_target(rel_fs_store):
+ resp = rel_fs_store.relationships(MALWARE_ID, target_only=True)
+
+ assert len(resp) == 2
+ assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
+ assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
+
+
+def test_relationships_by_target_and_type(rel_fs_store):
+ resp = rel_fs_store.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
+
+ assert len(resp) == 1
+ assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
+
+
+def test_relationships_by_target_and_source(rel_fs_store):
+ with pytest.raises(ValueError) as excinfo:
+ rel_fs_store.relationships(MALWARE_ID, target_only=True, source_only=True)
+
+ assert 'not both' in str(excinfo.value)
+
+
+def test_related_to(rel_fs_store):
+ mal = rel_fs_store.get(MALWARE_ID)
+ resp = rel_fs_store.related_to(mal)
+
+ assert len(resp) == 3
+ assert any(x['id'] == CAMPAIGN_ID for x in resp)
+ assert any(x['id'] == INDICATOR_ID for x in resp)
+ assert any(x['id'] == IDENTITY_ID for x in resp)
+
+
+def test_related_to_by_source(rel_fs_store):
+ resp = rel_fs_store.related_to(MALWARE_ID, source_only=True)
+
+ assert len(resp) == 1
+ assert any(x['id'] == IDENTITY_ID for x in resp)
+
+
+def test_related_to_by_target(rel_fs_store):
+ resp = rel_fs_store.related_to(MALWARE_ID, target_only=True)
+
+ assert len(resp) == 2
+ assert any(x['id'] == CAMPAIGN_ID for x in resp)
+ assert any(x['id'] == INDICATOR_ID for x in resp)
diff --git a/stix2/test/test_markings.py b/stix2/test/test_markings.py
index 456bf92..d2271f0 100644
--- a/stix2/test/test_markings.py
+++ b/stix2/test/test_markings.py
@@ -187,7 +187,8 @@ def test_parse_marking_definition(data):
])
class NewMarking(object):
def __init__(self, property2=None, **kwargs):
- return
+ if "property3" in kwargs and not isinstance(kwargs.get("property3"), int):
+ raise TypeError("Must be integer!")
def test_registered_custom_marking():
@@ -208,6 +209,13 @@ def test_registered_custom_marking():
assert marking_def.definition_type == "x-new-marking-type"
+def test_registered_custom_marking_raises_exception():
+ with pytest.raises(TypeError) as excinfo:
+ NewMarking(property1='something', property3='something', allow_custom=True)
+
+ assert str(excinfo.value) == "Must be integer!"
+
+
def test_not_registered_marking_raises_exception():
with pytest.raises(ValueError) as excinfo:
# Used custom object on purpose to demonstrate a not-registered marking
diff --git a/stix2/test/test_memory.py b/stix2/test/test_memory.py
index d5651d0..e290ca9 100644
--- a/stix2/test/test_memory.py
+++ b/stix2/test/test_memory.py
@@ -3,10 +3,15 @@ import shutil
import pytest
-from stix2 import (Bundle, Campaign, CustomObject, Filter, MemorySource,
- MemoryStore, properties)
+from stix2 import (Bundle, Campaign, CustomObject, Filter, Identity, Indicator,
+ Malware, MemorySource, MemoryStore, Relationship,
+ properties)
from stix2.sources import make_id
+from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
+ IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
+ MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS)
+
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
@@ -118,6 +123,19 @@ def mem_source():
yield MemorySource(STIX_OBJS1)
+@pytest.fixture
+def rel_mem_store():
+ cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
+ idy = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
+ ind = Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
+ mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
+ rel1 = Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
+ rel2 = Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
+ rel3 = Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
+ stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
+ yield MemoryStore(stix_objs)
+
+
def test_memory_source_get(mem_source):
resp = mem_source.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
@@ -287,3 +305,75 @@ def test_memory_store_custom_object(mem_store):
newobj_r = mem_store.get(newobj.id)
assert newobj_r.id == newobj.id
assert newobj_r.property1 == 'something'
+
+
+def test_relationships(rel_mem_store):
+ mal = rel_mem_store.get(MALWARE_ID)
+ resp = rel_mem_store.relationships(mal)
+
+ assert len(resp) == 3
+ assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
+ assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
+ assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
+
+
+def test_relationships_by_type(rel_mem_store):
+ mal = rel_mem_store.get(MALWARE_ID)
+ resp = rel_mem_store.relationships(mal, relationship_type='indicates')
+
+ assert len(resp) == 1
+ assert resp[0]['id'] == RELATIONSHIP_IDS[0]
+
+
+def test_relationships_by_source(rel_mem_store):
+ resp = rel_mem_store.relationships(MALWARE_ID, source_only=True)
+
+ assert len(resp) == 1
+ assert resp[0]['id'] == RELATIONSHIP_IDS[1]
+
+
+def test_relationships_by_target(rel_mem_store):
+ resp = rel_mem_store.relationships(MALWARE_ID, target_only=True)
+
+ assert len(resp) == 2
+ assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
+ assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
+
+
+def test_relationships_by_target_and_type(rel_mem_store):
+ resp = rel_mem_store.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
+
+ assert len(resp) == 1
+ assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
+
+
+def test_relationships_by_target_and_source(rel_mem_store):
+ with pytest.raises(ValueError) as excinfo:
+ rel_mem_store.relationships(MALWARE_ID, target_only=True, source_only=True)
+
+ assert 'not both' in str(excinfo.value)
+
+
+def test_related_to(rel_mem_store):
+ mal = rel_mem_store.get(MALWARE_ID)
+ resp = rel_mem_store.related_to(mal)
+
+ assert len(resp) == 3
+ assert any(x['id'] == CAMPAIGN_ID for x in resp)
+ assert any(x['id'] == INDICATOR_ID for x in resp)
+ assert any(x['id'] == IDENTITY_ID for x in resp)
+
+
+def test_related_to_by_source(rel_mem_store):
+ resp = rel_mem_store.related_to(MALWARE_ID, source_only=True)
+
+ assert len(resp) == 1
+ assert any(x['id'] == IDENTITY_ID for x in resp)
+
+
+def test_related_to_by_target(rel_mem_store):
+ resp = rel_mem_store.related_to(MALWARE_ID, target_only=True)
+
+ assert len(resp) == 2
+ assert any(x['id'] == CAMPAIGN_ID for x in resp)
+ assert any(x['id'] == INDICATOR_ID for x in resp)
diff --git a/stix2/test/test_utils.py b/stix2/test/test_utils.py
index c73bcd2..cbe5b0f 100644
--- a/stix2/test/test_utils.py
+++ b/stix2/test/test_utils.py
@@ -74,3 +74,11 @@ def test_get_dict(data):
def test_get_dict_invalid(data):
with pytest.raises(ValueError):
stix2.utils.get_dict(data)
+
+
+@pytest.mark.parametrize('stix_id, typ', [
+ ('malware--d69c8146-ab35-4d50-8382-6fc80e641d43', 'malware'),
+ ('intrusion-set--899ce53f-13a0-479b-a0e4-67d46e241542', 'intrusion-set')
+])
+def test_get_type_from_id(stix_id, typ):
+ assert stix2.utils.get_type_from_id(stix_id) == typ
diff --git a/stix2/utils.py b/stix2/utils.py
index 701fe56..386db31 100644
--- a/stix2/utils.py
+++ b/stix2/utils.py
@@ -314,3 +314,7 @@ def remove_custom_stix(stix_obj):
else:
return stix_obj
+
+
+def get_type_from_id(stix_id):
+ return stix_id.split('--', 1)[0]
diff --git a/stix2/v20/common.py b/stix2/v20/common.py
index 2d15529..ef45060 100644
--- a/stix2/v20/common.py
+++ b/stix2/v20/common.py
@@ -145,7 +145,14 @@ def CustomMarking(type='x-custom-marking', properties=None):
def __init__(self, **kwargs):
_STIXBase.__init__(self, **kwargs)
- cls.__init__(self, **kwargs)
+ try:
+ cls.__init__(self, **kwargs)
+ except (AttributeError, TypeError) as e:
+ # Don't accidentally catch errors raised in a custom __init__()
+ if ("has no attribute '__init__'" in str(e) or
+ str(e) == "object.__init__() takes no parameters"):
+ return
+ raise e
_register_marking(_Custom)
return _Custom
diff --git a/tox.ini b/tox.ini
index fe4fb01..bfc8c1b 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,5 @@
[tox]
-envlist = py27,py33,py34,py35,py36,pycodestyle,isort-check
+envlist = py27,py34,py35,py36,pycodestyle,isort-check
[testenv]
deps =
@@ -36,7 +36,6 @@ commands =
[travis]
python =
2.7: py27, pycodestyle
- 3.3: py33, pycodestyle
3.4: py34, pycodestyle
3.5: py35, pycodestyle
3.6: py36, pycodestyle