Merge pull request #88 from oasis-open/76-filesystem-bundles

Filesystem Bundles and custom content
stix2.0
Greg Back 2017-10-31 19:44:43 +00:00 committed by GitHub
commit 07a5d3a98e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 925 additions and 455 deletions

View File

@ -40,7 +40,14 @@ class _STIXBase(collections.Mapping):
"""Base class for STIX object types"""
def object_properties(self):
return list(self._properties.keys())
props = set(self._properties.keys())
custom_props = list(set(self._inner.keys()) - props)
custom_props.sort()
all_properties = list(self._properties.keys())
all_properties.extend(custom_props) # Any custom properties to the bottom
return all_properties
def _check_property(self, prop_name, prop, kwargs):
if prop_name not in kwargs:

View File

@ -7,9 +7,9 @@ from .base import _STIXBase
from .common import MarkingDefinition
from .properties import IDProperty, ListProperty, Property, TypeProperty
from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator,
IntrusionSet, Malware, ObservedData, Report, ThreatActor,
Tool, Vulnerability)
from .sro import Relationship, Sighting
IntrusionSet, Malware, ObservedData, Report,
STIXDomainObject, ThreatActor, Tool, Vulnerability)
from .sro import Relationship, Sighting, STIXRelationshipObject
from .utils import get_dict
@ -20,6 +20,11 @@ class STIXObjectProperty(Property):
super(STIXObjectProperty, self).__init__()
def clean(self, value):
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
# a bundle with no further checks.
if isinstance(value, (STIXDomainObject, STIXRelationshipObject,
MarkingDefinition)):
return value
try:
dictified = get_dict(value)
except ValueError:

View File

@ -44,36 +44,40 @@ class DataStore(object):
self.source = source
self.sink = sink
def get(self, stix_id):
def get(self, stix_id, allow_custom=False):
"""Retrieve the most recent version of a single STIX object by ID.
Translate get() call to the appropriate DataSource call.
Args:
stix_id (str): the id of the STIX object to retrieve.
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_obj: the single most recent version of the STIX
object specified by the "id".
"""
return self.source.get(stix_id)
return self.source.get(stix_id, allow_custom=allow_custom)
def all_versions(self, stix_id):
def all_versions(self, stix_id, allow_custom=False):
"""Retrieve all versions of a single STIX object by ID.
Implement: Translate all_versions() call to the appropriate DataSource call
Args:
stix_id (str): the id of the STIX object to retrieve.
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_objs (list): a list of STIX objects
"""
return self.source.all_versions(stix_id)
return self.source.all_versions(stix_id, allow_custom=allow_custom)
def query(self, query):
def query(self, query=None, allow_custom=False):
"""Retrieve STIX objects matching a set of filters.
Implement: Specific data source API calls, processing,
@ -82,6 +86,8 @@ class DataStore(object):
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on.
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_objs (list): a list of STIX objects
@ -89,15 +95,17 @@ class DataStore(object):
"""
return self.source.query(query=query)
def add(self, stix_objs):
def add(self, stix_objs, allow_custom=False):
"""Store STIX objects.
Translates add() to the appropriate DataSink call.
Args:
stix_objs (list): a list of STIX objects
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
"""
return self.sink.add(stix_objs)
return self.sink.add(stix_objs, allow_custom=allow_custom)
class DataSink(object):
@ -111,7 +119,7 @@ class DataSink(object):
def __init__(self):
self.id = make_id()
def add(self, stix_objs):
def add(self, stix_objs, allow_custom=False):
"""Store STIX objects.
Implement: Specific data sink API calls, processing,
@ -120,6 +128,8 @@ class DataSink(object):
Args:
stix_objs (list): a list of STIX objects (where each object is a
STIX object)
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
"""
raise NotImplementedError()
@ -139,7 +149,7 @@ class DataSource(object):
self.id = make_id()
self.filters = set()
def get(self, stix_id, _composite_filters=None):
def get(self, stix_id, _composite_filters=None, allow_custom=False):
"""
Implement: Specific data source API calls, processing,
functionality required for retrieving data from the data source
@ -148,9 +158,10 @@ class DataSource(object):
stix_id (str): the id of the STIX 2.0 object to retrieve. Should
return a single object, the most recent version of the object
specified by the "id".
_composite_filters (set): set of filters passed from the parent
the CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_obj: the STIX object
@ -158,7 +169,7 @@ class DataSource(object):
"""
raise NotImplementedError()
def all_versions(self, stix_id, _composite_filters=None):
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
"""
Implement: Similar to get() except returns list of all object versions of
the specified "id". In addition, implement the specific data
@ -169,9 +180,10 @@ class DataSource(object):
stix_id (str): The id of the STIX 2.0 object to retrieve. Should
return a list of objects, all the versions of the object
specified by the "id".
_composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_objs (list): a list of STIX objects
@ -179,7 +191,7 @@ class DataSource(object):
"""
raise NotImplementedError()
def query(self, query, _composite_filters=None):
def query(self, query=None, _composite_filters=None, allow_custom=False):
"""
Implement:Implement the specific data source API calls, processing,
functionality required for retrieving query from the data source
@ -187,9 +199,10 @@ class DataSource(object):
Args:
query (list): a list of filters (which collectively are the query)
to conduct search on
_composite_filters (set): a set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_objs (list): a list of STIX objects
@ -224,7 +237,7 @@ class CompositeDataSource(DataSource):
super(CompositeDataSource, self).__init__()
self.data_sources = []
def get(self, stix_id, _composite_filters=None):
def get(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX object by STIX ID
Federated retrieve method, iterates through all DataSources
@ -238,10 +251,11 @@ class CompositeDataSource(DataSource):
Args:
stix_id (str): the id of the STIX object to retrieve.
_composite_filters (list): a list of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is attached
to another parent CompositeDataSource), not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
stix_obj: the STIX object to be returned.
@ -259,7 +273,7 @@ class CompositeDataSource(DataSource):
# for every configured Data Source, call its retrieve handler
for ds in self.data_sources:
data = ds.get(stix_id=stix_id, _composite_filters=all_filters)
data = ds.get(stix_id=stix_id, _composite_filters=all_filters, allow_custom=allow_custom)
if data:
all_data.append(data)
@ -274,7 +288,7 @@ class CompositeDataSource(DataSource):
return stix_obj
def all_versions(self, stix_id, _composite_filters=None):
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX objects by STIX ID
Federated all_versions retrieve method - iterates through all DataSources
@ -285,10 +299,11 @@ class CompositeDataSource(DataSource):
Args:
stix_id (str): id of the STIX objects to retrieve
_composite_filters (list): a list of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is attached
to a parent CompositeDataSource), not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
all_data (list): list of STIX objects that have the specified id
@ -307,7 +322,7 @@ class CompositeDataSource(DataSource):
# retrieve STIX objects from all configured data sources
for ds in self.data_sources:
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters)
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters, allow_custom=allow_custom)
all_data.extend(data)
# remove exact duplicates (where duplicates are STIX 2.0 objects
@ -317,7 +332,7 @@ class CompositeDataSource(DataSource):
return all_data
def query(self, query=None, _composite_filters=None):
def query(self, query=None, _composite_filters=None, allow_custom=False):
"""Retrieve STIX objects that match query
Federate the query to all DataSources attached to the
@ -325,10 +340,11 @@ class CompositeDataSource(DataSource):
Args:
query (list): list of filters to search on
_composite_filters (list): a list of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is attached
to a parent CompositeDataSource), not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
all_data (list): list of STIX objects to be returned
@ -353,7 +369,7 @@ class CompositeDataSource(DataSource):
# federate query to all attached data sources,
# pass composite filters to id
for ds in self.data_sources:
data = ds.query(query=query, _composite_filters=all_filters)
data = ds.query(query=query, _composite_filters=all_filters, allow_custom=allow_custom)
all_data.extend(data)
# remove exact duplicates (where duplicates are STIX 2.0

View File

@ -8,51 +8,54 @@ TODO:
import json
import os
from stix2.base import _STIXBase
from stix2.common import MarkingDefinition
from stix2.core import Bundle, parse
from stix2.sdo import STIXDomainObject
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter, apply_common_filters
from stix2.sro import STIXRelationshipObject
from stix2.utils import deduplicate
class FileSystemStore(DataStore):
"""FileSystemStore
"""Interface to a file directory of STIX objects.
Provides an interface to an file directory of STIX objects.
FileSystemStore is a wrapper around a paired FileSystemSink
and FileSystemSource.
Args:
stix_dir (str): path to directory of STIX objects
bundlify (bool): Whether to wrap objects in bundles when saving them.
Default: False.
Attributes:
source (FileSystemSource): FuleSystemSource
sink (FileSystemSink): FileSystemSink
"""
def __init__(self, stix_dir):
def __init__(self, stix_dir, bundlify=False):
super(FileSystemStore, self).__init__()
self.source = FileSystemSource(stix_dir=stix_dir)
self.sink = FileSystemSink(stix_dir=stix_dir)
self.sink = FileSystemSink(stix_dir=stix_dir, bundlify=bundlify)
class FileSystemSink(DataSink):
"""FileSystemSink
Provides an interface for adding/pushing STIX objects
to file directory of STIX objects.
"""Interface for adding/pushing STIX objects to file directory of STIX
objects.
Can be paired with a FileSystemSource, together as the two
components of a FileSystemStore.
Args:
stix_dir (str): path to directory of STIX objects
stix_dir (str): path to directory of STIX objects.
bundlify (bool): Whether to wrap objects in bundles when saving them.
Default: False.
"""
def __init__(self, stix_dir):
def __init__(self, stix_dir, bundlify=False):
super(FileSystemSink, self).__init__()
self._stix_dir = os.path.abspath(stix_dir)
self.bundlify = bundlify
if not os.path.exists(self._stix_dir):
raise ValueError("directory path for STIX data does not exist")
@ -61,62 +64,69 @@ class FileSystemSink(DataSink):
def stix_dir(self):
return self._stix_dir
def add(self, stix_data=None):
"""add STIX objects to file directory
Args:
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
in a STIX object(or list of), dict (or list of), or a STIX 2.0
json encoded string
TODO: Bundlify STIX content or no? When dumping to disk.
def _check_path_and_write(self, stix_obj):
"""Write the given STIX object to a file in the STIX file directory.
"""
def _check_path_and_write(stix_dir, stix_obj):
path = os.path.join(stix_dir, stix_obj["type"], stix_obj["id"] + ".json")
path = os.path.join(self._stix_dir, stix_obj["type"], stix_obj["id"] + ".json")
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
if self.bundlify:
stix_obj = Bundle(stix_obj)
with open(path, "w") as f:
# Bundle() can take dict or STIX obj as argument
f.write(str(Bundle(stix_obj)))
f.write(str(stix_obj))
if isinstance(stix_data, _STIXBase):
def add(self, stix_data=None, allow_custom=False):
"""Add STIX objects to file directory.
Args:
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
in a STIX object (or list of), dict (or list of), or a STIX 2.0
json encoded string.
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
Note:
``stix_data`` can be a Bundle object, but each object in it will be
saved separately; you will be able to retrieve any of the objects
the Bundle contained, but not the Bundle itself.
"""
if isinstance(stix_data, (STIXDomainObject, STIXRelationshipObject, MarkingDefinition)):
# adding python STIX object
_check_path_and_write(self._stix_dir, stix_data)
self._check_path_and_write(stix_data)
elif isinstance(stix_data, dict):
elif isinstance(stix_data, (str, dict)):
stix_data = parse(stix_data, allow_custom)
if stix_data["type"] == "bundle":
# adding json-formatted Bundle - extracting STIX objects
for stix_obj in stix_data["objects"]:
# extract STIX objects
for stix_obj in stix_data.get("objects", []):
self.add(stix_obj)
else:
# adding json-formatted STIX
_check_path_and_write(self._stix_dir, stix_data)
self._check_path_and_write(stix_data)
elif isinstance(stix_data, str):
# adding json encoded string of STIX content
stix_data = parse(stix_data)
if stix_data["type"] == "bundle":
for stix_obj in stix_data["objects"]:
elif isinstance(stix_data, Bundle):
# recursively add individual STIX objects
for stix_obj in stix_data.get("objects", []):
self.add(stix_obj)
else:
self.add(stix_data)
elif isinstance(stix_data, list):
# if list, recurse call on individual STIX objects
# recursively add individual STIX objects
for stix_obj in stix_data:
self.add(stix_obj)
else:
raise ValueError("stix_data must be a STIX object(or list of), json formatted STIX(or list of) or a json formatted STIX bundle")
raise TypeError("stix_data must be a STIX object (or list of), "
"JSON formatted STIX (or list of), "
"or a JSON formatted STIX bundle")
class FileSystemSource(DataSource):
"""FileSystemSource
Provides an interface for searching/retrieving
STIX objects from a STIX object file directory.
"""Interface for searching/retrieving STIX objects from a STIX object file
directory.
Can be paired with a FileSystemSink, together as the two
components of a FileSystemStore.
@ -136,14 +146,15 @@ class FileSystemSource(DataSource):
def stix_dir(self):
return self._stix_dir
def get(self, stix_id, _composite_filters=None):
"""retrieve STIX object from file directory via STIX ID
def get(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX object from file directory via STIX ID.
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(STIX object): STIX object that has the supplied STIX ID.
@ -153,47 +164,49 @@ class FileSystemSource(DataSource):
"""
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters)
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
if all_data:
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
stix_obj = parse(stix_obj)
else:
stix_obj = None
return stix_obj
def all_versions(self, stix_id, _composite_filters=None):
"""retrieve STIX object from file directory via STIX ID, all versions
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX object from file directory via STIX ID, all versions.
Note: Since FileSystem sources/sinks don't handle multiple versions
of a STIX object, this operation is unnecessary. Pass call to get().
Args:
stix_id (str): The STIX ID of the STIX objects to be retrieved.
composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(list): of STIX objects that has the supplied STIX ID.
The STIX objects are loaded from their json files, parsed into
a python STIX objects and then returned
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None):
"""search and retrieve STIX objects based on the complete query
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters, allow_custom=allow_custom)]
def query(self, query=None, _composite_filters=None, allow_custom=False):
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
attached to MemorySource, and any filters passed from a
CompositeDataSource (i.e. _composite_filters)
attached to this FileSystemSource, and any filters passed from a
CompositeDataSource (i.e. _composite_filters).
Args:
query (list): list of filters to search on
composite_filters (set): set of filters passed from the
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(list): list of STIX objects that matches the supplied
@ -209,7 +222,7 @@ class FileSystemSource(DataSource):
if not isinstance(query, list):
# make sure dont make set from a Filter object,
# need to make a set from a list of Filter objects (even if just one Filter)
query = list(query)
query = [query]
query = set(query)
# combine all query filters
@ -254,8 +267,8 @@ class FileSystemSource(DataSource):
# so query will look in all STIX directories that are not
# the specified type. Compile correct dir paths
for dir in os.listdir(self._stix_dir):
if os.path.abspath(dir) not in declude_paths:
include_paths.append(os.path.abspath(dir))
if os.path.abspath(os.path.join(self._stix_dir, dir)) not in declude_paths:
include_paths.append(os.path.abspath(os.path.join(self._stix_dir, dir)))
# grab stix object ID as well - if present in filters, as
# may forgo the loading of STIX content into memory
@ -273,34 +286,32 @@ class FileSystemSource(DataSource):
for path in include_paths:
for root, dirs, files in os.walk(path):
for file_ in files:
if id_:
if id_ == file_.split(".")[0]:
# since ID is specified in one of filters, can evaluate against filename first without loading
stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0]
# check against other filters, add if match
all_data.extend(apply_common_filters([stix_obj], query))
else:
if not id_ or id_ == file_.split(".")[0]:
# have to load into memory regardless to evaluate other filters
stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0]
stix_obj = json.load(open(os.path.join(root, file_)))
if stix_obj.get('type', '') == 'bundle':
stix_obj = stix_obj['objects'][0]
# check against other filters, add if match
all_data.extend(apply_common_filters([stix_obj], query))
all_data = deduplicate(all_data)
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict) for stix_obj_dict in all_data]
stix_objs = [parse(stix_obj_dict, allow_custom) for stix_obj_dict in all_data]
return stix_objs
def _parse_file_filters(self, query):
"""utility method to extract STIX common filters
that can used to possibly speed up querying STIX objects
from the file system
"""Extract STIX common filters.
Possibly speeds up querying STIX objects from the file system.
Extracts filters that are for the "id" and "type" field of
a STIX object. As the file directory is organized by STIX
object type with filenames that are equivalent to the STIX
object ID, these filters can be used first to reduce the
search space of a FileSystemStore(or FileSystemSink)
search space of a FileSystemStore (or FileSystemSink).
"""
file_filters = set()
for filter_ in query:

View File

@ -24,16 +24,18 @@ from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter, apply_common_filters
def _add(store, stix_data=None):
"""Adds STIX objects to MemoryStore/Sink.
def _add(store, stix_data=None, allow_custom=False):
"""Add STIX objects to MemoryStore/Sink.
Adds STIX objects to an in-memory dictionary for fast lookup.
Recursive function, breaks down STIX Bundles and lists.
Args:
stix_data (list OR dict OR STIX object): STIX objects to be added
"""
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
"""
if isinstance(stix_data, _STIXBase):
# adding a python STIX object
store._data[stix_data["id"]] = stix_data
@ -41,35 +43,35 @@ def _add(store, stix_data=None):
elif isinstance(stix_data, dict):
if stix_data["type"] == "bundle":
# adding a json bundle - so just grab STIX objects
for stix_obj in stix_data["objects"]:
_add(store, stix_obj)
for stix_obj in stix_data.get("objects", []):
_add(store, stix_obj, allow_custom=allow_custom)
else:
# adding a json STIX object
store._data[stix_data["id"]] = stix_data
elif isinstance(stix_data, str):
# adding json encoded string of STIX content
stix_data = parse(stix_data)
stix_data = parse(stix_data, allow_custom=allow_custom)
if stix_data["type"] == "bundle":
# recurse on each STIX object in bundle
for stix_obj in stix_data:
_add(store, stix_obj)
for stix_obj in stix_data.get("objects", []):
_add(store, stix_obj, allow_custom=allow_custom)
else:
_add(store, stix_data)
elif isinstance(stix_data, list):
# STIX objects are in a list- recurse on each object
for stix_obj in stix_data:
_add(store, stix_obj)
_add(store, stix_obj, allow_custom=allow_custom)
else:
raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")
raise TypeError("stix_data must be a STIX object (or list of), JSON formatted STIX (or list of), or a JSON formatted STIX bundle")
class MemoryStore(DataStore):
"""Provides an interface to an in-memory dictionary
of STIX objects. MemoryStore is a wrapper around a paired
MemorySink and MemorySource
"""Interface to an in-memory dictionary of STIX objects.
MemoryStore is a wrapper around a paired MemorySink and MemorySource.
Note: It doesn't make sense to create a MemoryStore by passing
in existing MemorySource and MemorySink because there could
@ -77,36 +79,54 @@ class MemoryStore(DataStore):
Args:
stix_data (list OR dict OR STIX object): STIX content to be added
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
Attributes:
_data (dict): the in-memory dict that holds STIX objects
source (MemorySource): MemorySource
sink (MemorySink): MemorySink
"""
def __init__(self, stix_data=None):
def __init__(self, stix_data=None, allow_custom=False):
super(MemoryStore, self).__init__()
self._data = {}
if stix_data:
_add(self, stix_data)
_add(self, stix_data, allow_custom=allow_custom)
self.source = MemorySource(stix_data=self._data, _store=True)
self.sink = MemorySink(stix_data=self._data, _store=True)
self.source = MemorySource(stix_data=self._data, _store=True, allow_custom=allow_custom)
self.sink = MemorySink(stix_data=self._data, _store=True, allow_custom=allow_custom)
def save_to_file(self, file_path):
return self.sink.save_to_file(file_path=file_path)
def save_to_file(self, file_path, allow_custom=False):
"""Write SITX objects from in-memory dictionary to JSON file, as a STIX
Bundle.
def load_from_file(self, file_path):
return self.source.load_from_file(file_path=file_path)
Args:
file_path (str): file path to write STIX data to
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
"""
return self.sink.save_to_file(file_path=file_path, allow_custom=allow_custom)
def load_from_file(self, file_path, allow_custom=False):
"""Load STIX data from JSON file.
File format is expected to be a single JSON
STIX object or JSON STIX bundle.
Args:
file_path (str): file path to load STIX data from
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
"""
return self.source.load_from_file(file_path=file_path, allow_custom=allow_custom)
class MemorySink(DataSink):
"""Provides an interface for adding/pushing STIX objects
to an in-memory dictionary.
"""Interface for adding/pushing STIX objects to an in-memory dictionary.
Designed to be paired with a MemorySource, together as the two
components of a MemoryStore.
@ -114,51 +134,43 @@ class MemorySink(DataSink):
Args:
stix_data (dict OR list): valid STIX 2.0 content in
bundle or a list.
_store (bool): if the MemorySink is a part of a DataStore,
in which case "stix_data" is a direct reference to
shared memory with DataSource. Not user supplied
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
Attributes:
_data (dict): the in-memory dict that holds STIX objects.
If apart of a MemoryStore, dict is shared between with
a MemorySource
"""
def __init__(self, stix_data=None, _store=False):
"""
def __init__(self, stix_data=None, _store=False, allow_custom=False):
super(MemorySink, self).__init__()
self._data = {}
if _store:
self._data = stix_data
elif stix_data:
_add(self, stix_data)
_add(self, stix_data, allow_custom=allow_custom)
def add(self, stix_data):
"""add STIX objects to in-memory dictionary maintained by
the MemorySink (MemoryStore)
def add(self, stix_data, allow_custom=False):
_add(self, stix_data, allow_custom=allow_custom)
add.__doc__ = _add.__doc__
see "_add()" for args documentation
"""
_add(self, stix_data)
def save_to_file(self, file_path):
"""write SITX objects in in-memory dictionary to json file, as a STIX Bundle
Args:
file_path (str): file path to write STIX data to
"""
def save_to_file(self, file_path, allow_custom=False):
file_path = os.path.abspath(file_path)
if not os.path.exists(os.path.dirname(file_path)):
os.makedirs(os.path.dirname(file_path))
with open(file_path, "w") as f:
f.write(str(Bundle(self._data.values())))
f.write(str(Bundle(self._data.values(), allow_custom=allow_custom)))
save_to_file.__doc__ = MemoryStore.save_to_file.__doc__
class MemorySource(DataSource):
"""Provides an interface for searching/retrieving
STIX objects from an in-memory dictionary.
"""Interface for searching/retrieving STIX objects from an in-memory
dictionary.
Designed to be paired with a MemorySink, together as the two
components of a MemoryStore.
@ -166,42 +178,44 @@ class MemorySource(DataSource):
Args:
stix_data (dict OR list OR STIX object): valid STIX 2.0 content in
bundle or list.
_store (bool): if the MemorySource is a part of a DataStore,
in which case "stix_data" is a direct reference to shared
memory with DataSink. Not user supplied
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
Attributes:
_data (dict): the in-memory dict that holds STIX objects.
If apart of a MemoryStore, dict is shared between with
a MemorySink
"""
def __init__(self, stix_data=None, _store=False):
"""
def __init__(self, stix_data=None, _store=False, allow_custom=False):
super(MemorySource, self).__init__()
self._data = {}
if _store:
self._data = stix_data
elif stix_data:
_add(self, stix_data)
_add(self, stix_data, allow_custom=allow_custom)
def get(self, stix_id, _composite_filters=None):
"""retrieve STIX object from in-memory dict via STIX ID
def get(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX object from in-memory dict via STIX ID.
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(dict OR STIX object): STIX object that has the supplied
ID. As the MemoryStore(i.e. MemorySink) adds STIX objects to memory
as they are supplied (either as python dictionary or STIX object), it
is returned in the same form as it as added
"""
"""
if _composite_filters is None:
# if get call is only based on 'id', no need to search, just retrieve from dict
try:
@ -213,7 +227,7 @@ class MemorySource(DataSource):
# if there are filters from the composite level, process full query
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters)
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
if all_data:
# reduce to most recent version
@ -223,17 +237,18 @@ class MemorySource(DataSource):
else:
return None
def all_versions(self, stix_id, _composite_filters=None):
"""retrieve STIX objects from in-memory dict via STIX ID, all versions of it
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX objects from in-memory dict via STIX ID, all versions of it
Note: Since Memory sources/sinks don't handle multiple versions of a
STIX object, this operation is unnecessary. Translate call to get().
Args:
stix_id (str): The STIX ID of the STIX 2 object to retrieve.
composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(list): list of STIX objects that has the supplied ID. As the
@ -242,26 +257,27 @@ class MemorySource(DataSource):
is returned in the same form as it as added
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters, allow_custom=allow_custom)]
def query(self, query=None, _composite_filters=None):
"""search and retrieve STIX objects based on the complete query
def query(self, query=None, _composite_filters=None, allow_custom=False):
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
attached to MemorySource, and any filters passed from a
CompositeDataSource (i.e. _composite_filters)
attached to this MemorySource, and any filters passed from a
CompositeDataSource (i.e. _composite_filters).
Args:
query (list): list of filters to search on
composite_filters (set): set of filters passed from the
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(list): list of STIX objects that matches the supplied
query. As the MemoryStore(i.e. MemorySink) adds STIX objects to memory
as they are supplied (either as python dictionary or STIX object), it
is returned in the same form as it as added
is returned in the same form as it as added.
"""
if query is None:
@ -270,7 +286,7 @@ class MemorySource(DataSource):
if not isinstance(query, list):
# make sure dont make set from a Filter object,
# need to make a set from a list of Filter objects (even if just one Filter)
query = list(query)
query = [query]
query = set(query)
# combine all query filters
@ -284,15 +300,8 @@ class MemorySource(DataSource):
return all_data
def load_from_file(self, file_path):
"""load STIX data from json file
File format is expected to be a single json
STIX object or json STIX bundle
Args:
file_path (str): file path to load STIX data from
"""
def load_from_file(self, file_path, allow_custom=False):
file_path = os.path.abspath(file_path)
stix_data = json.load(open(file_path, "r"))
_add(self, stix_data)
_add(self, stix_data, allow_custom=allow_custom)
load_from_file.__doc__ = MemoryStore.load_from_file.__doc__

View File

@ -37,39 +37,40 @@ class TAXIICollectionSink(DataSink):
super(TAXIICollectionSink, self).__init__()
self.collection = collection
def add(self, stix_data):
"""add/push STIX content to TAXII Collection endpoint
def add(self, stix_data, allow_custom=False):
"""Add/push STIX content to TAXII Collection endpoint
Args:
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
in a STIX object (or Bundle), STIX onject dict (or Bundle dict), or a STIX 2.0
json encoded string, or list of any of the following
allow_custom (bool): whether to allow custom objects/properties or
not. Default: False.
"""
if isinstance(stix_data, _STIXBase):
# adding python STIX object
bundle = dict(Bundle(stix_data))
bundle = dict(Bundle(stix_data, allow_custom=allow_custom))
elif isinstance(stix_data, dict):
# adding python dict (of either Bundle or STIX obj)
if stix_data["type"] == "bundle":
bundle = stix_data
else:
bundle = dict(Bundle(stix_data))
bundle = dict(Bundle(stix_data, allow_custom=allow_custom))
elif isinstance(stix_data, list):
# adding list of something - recurse on each
for obj in stix_data:
self.add(obj)
self.add(obj, allow_custom=allow_custom)
elif isinstance(stix_data, str):
# adding json encoded string of STIX content
stix_data = parse(stix_data)
stix_data = parse(stix_data, allow_custom=allow_custom)
if stix_data["type"] == "bundle":
bundle = dict(stix_data)
else:
bundle = dict(Bundle(stix_data))
bundle = dict(Bundle(stix_data, allow_custom=allow_custom))
else:
raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")
@ -89,22 +90,22 @@ class TAXIICollectionSource(DataSource):
super(TAXIICollectionSource, self).__init__()
self.collection = collection
def get(self, stix_id, _composite_filters=None):
"""retrieve STIX object from local/remote STIX Collection
def get(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX object from local/remote STIX Collection
endpoint.
Args:
stix_id (str): The STIX ID of the STIX object to be retrieved.
composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(STIX object): STIX object that has the supplied STIX ID.
The STIX object is received from TAXII has dict, parsed into
a python STIX object and then returned
"""
# combine all query filters
query = set()
@ -120,7 +121,7 @@ class TAXIICollectionSource(DataSource):
stix_obj = list(apply_common_filters(stix_objs, query))
if len(stix_obj):
stix_obj = parse(stix_obj[0])
stix_obj = parse(stix_obj[0], allow_custom=allow_custom)
if stix_obj.id != stix_id:
# check - was added to handle erroneous TAXII servers
stix_obj = None
@ -129,15 +130,16 @@ class TAXIICollectionSource(DataSource):
return stix_obj
def all_versions(self, stix_id, _composite_filters=None):
"""retrieve STIX object from local/remote TAXII Collection
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX object from local/remote TAXII Collection
endpoint, all versions of it
Args:
stix_id (str): The STIX ID of the STIX objects to be retrieved.
composite_filters (set): set of filters passed from the parent
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(see query() as all_versions() is just a wrapper)
@ -149,7 +151,7 @@ class TAXIICollectionSource(DataSource):
Filter("match[version]", "=", "all")
]
all_data = self.query(query=query, _composite_filters=_composite_filters)
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
# parse STIX objects from TAXII returned json
all_data = [parse(stix_obj) for stix_obj in all_data]
@ -159,8 +161,8 @@ class TAXIICollectionSource(DataSource):
return all_data_clean
def query(self, query=None, _composite_filters=None):
"""search and retreive STIX objects based on the complete query
def query(self, query=None, _composite_filters=None, allow_custom=False):
"""Search and retreive STIX objects based on the complete query
A "complete query" includes the filters from the query, the filters
attached to MemorySource, and any filters passed from a
@ -168,9 +170,10 @@ class TAXIICollectionSource(DataSource):
Args:
query (list): list of filters to search on
composite_filters (set): set of filters passed from the
CompositeDataSource, not user supplied
allow_custom (bool): whether to retrieve custom objects/properties
or not. Default: False.
Returns:
(list): list of STIX objects that matches the supplied
@ -178,7 +181,6 @@ class TAXIICollectionSource(DataSource):
parsed into python STIX objects and then returned.
"""
if query is None:
query = set()
else:
@ -198,7 +200,7 @@ class TAXIICollectionSource(DataSource):
taxii_filters = self._parse_taxii_filters(query)
# query TAXII collection
all_data = self.collection.get_objects(filters=taxii_filters)["objects"]
all_data = self.collection.get_objects(filters=taxii_filters, allow_custom=allow_custom)["objects"]
# deduplicate data (before filtering as reduces wasted filtering)
all_data = deduplicate(all_data)
@ -207,7 +209,7 @@ class TAXIICollectionSource(DataSource):
all_data = list(apply_common_filters(all_data, query))
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict) for stix_obj_dict in all_data]
stix_objs = [parse(stix_obj_dict, allow_custom=allow_custom) for stix_obj_dict in all_data]
return stix_objs
@ -229,7 +231,6 @@ class TAXIICollectionSource(DataSource):
for 'requests.get()'.
"""
params = {}
for filter_ in query:

View File

@ -1,6 +1,3 @@
{
"id": "bundle--2ed6ab6a-ca68-414f-8493-e4db8b75dd51",
"objects": [
{
"created": "2017-05-31T21:30:41.022744Z",
"created_by_ref": "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
@ -10,7 +7,3 @@
"name": "Data from Network Shared Drive Mitigation",
"type": "course-of-action"
}
],
"spec_version": "2.0",
"type": "bundle"
}

View File

@ -158,3 +158,10 @@ def test_parse_unknown_type():
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse(unknown)
assert str(excinfo.value) == "Can't parse unknown object type 'other'! For custom types, use the CustomObject decorator."
def test_stix_object_property():
prop = stix2.core.STIXObjectProperty()
identity = stix2.Identity(name="test", identity_class="individual")
assert prop.clean(identity) is identity

View File

@ -91,6 +91,7 @@ def test_custom_property_in_bundled_object():
bundle = stix2.Bundle(identity, allow_custom=True)
assert bundle.objects[0].x_foo == "bar"
assert '"x_foo": "bar"' in str(bundle)
@stix2.sdo.CustomObject('x-new-type', [

View File

@ -1,17 +1,13 @@
import os
import pytest
from taxii2client import Collection
from stix2 import (Campaign, FileSystemSink, FileSystemSource, FileSystemStore,
Filter, MemorySource, MemoryStore)
from stix2 import Filter, MemorySource
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, make_id, taxii)
from stix2.sources.filters import apply_common_filters
from stix2.utils import deduplicate
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
class MockTAXIIClient(object):
@ -148,28 +144,6 @@ def test_ds_abstract_class_smoke():
ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")])
def test_memory_store_smoke():
# Initialize MemoryStore with dict
ms = MemoryStore(STIX_OBJS1)
# Add item to sink
ms.add(dict(id="bundle--%s" % make_id(),
objects=STIX_OBJS2,
spec_version="2.0",
type="bundle"))
resp = ms.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
assert len(resp) == 1
resp = ms.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
query = [Filter('type', '=', 'malware')]
resp = ms.query(query)
assert len(resp) == 0
def test_ds_taxii(collection):
ds = taxii.TAXIICollectionSource(collection)
assert ds.collection is not None
@ -512,207 +486,3 @@ def test_composite_datasource_operations():
# STIX_OBJS2 has indicator with later time, one with different id, one with
# original time in STIX_OBJS1
assert len(results) == 3
def test_filesytem_source():
# creation
fs_source = FileSystemSource(FS_PATH)
assert fs_source.stix_dir == FS_PATH
# get object
mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")
assert mal.id == "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"
assert mal.name == "Rover"
# all versions - (currently not a true all versions call as FileSystem cant have multiple versions)
id_ = fs_source.get("identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5")
assert id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
assert id_.name == "The MITRE Corporation"
assert id_.type == "identity"
# query
intrusion_sets = fs_source.query([Filter("type", '=', "intrusion-set")])
assert len(intrusion_sets) == 2
assert "intrusion-set--a653431d-6a5e-4600-8ad3-609b5af57064" in [is_.id for is_ in intrusion_sets]
assert "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a" in [is_.id for is_ in intrusion_sets]
is_1 = [is_ for is_ in intrusion_sets if is_.id == "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a"][0]
assert "DragonOK" in is_1.aliases
assert len(is_1.external_references) == 4
# query2
is_2 = fs_source.query([Filter("external_references.external_id", '=', "T1027")])
assert len(is_2) == 1
is_2 = is_2[0]
assert is_2.id == "attack-pattern--b3d682b6-98f2-4fb0-aa3b-b4df007ca70a"
assert is_2.type == "attack-pattern"
def test_filesystem_sink():
# creation
fs_sink = FileSystemSink(FS_PATH)
assert fs_sink.stix_dir == FS_PATH
fs_source = FileSystemSource(FS_PATH)
# Test all the ways stix objects can be added (via different supplied forms)
# add python stix object
camp1 = Campaign(name="Hannibal",
objective="Targeting Italian and Spanish Diplomat internet accounts",
aliases=["War Elephant"])
fs_sink.add(camp1)
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp1.id + ".json"))
camp1_r = fs_source.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == "Hannibal"
assert "War Elephant" in camp1_r.aliases
# add stix object dict
camp2 = {
"name": "Aurelius",
"type": "campaign",
"objective": "German and French Intelligence Services",
"aliases": ["Purple Robes"],
"id": "campaign--111111b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
fs_sink.add(camp2)
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp2["id"] + ".json"))
camp2_r = fs_source.get(camp2["id"])
assert camp2_r.id == camp2["id"]
assert camp2_r.name == camp2["name"]
assert "Purple Robes" in camp2_r.aliases
# add stix bundle dict
bund = {
"type": "bundle",
"id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a",
"spec_version": "2.0",
"objects": [
{
"name": "Atilla",
"type": "campaign",
"objective": "Bulgarian, Albanian and Romanian Intelligence Services",
"aliases": ["Huns"],
"id": "campaign--133111b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
]
}
fs_sink.add(bund)
assert os.path.exists(os.path.join(FS_PATH, "campaign", bund["objects"][0]["id"] + ".json"))
camp3_r = fs_source.get(bund["objects"][0]["id"])
assert camp3_r.id == bund["objects"][0]["id"]
assert camp3_r.name == bund["objects"][0]["name"]
assert "Huns" in camp3_r.aliases
# add json-encoded stix obj
camp4 = '{"type": "campaign", "id":"campaign--144111b6-1112-4fb0-111b-b111107ca70a",'\
' "created":"2017-05-31T21:31:53.197755Z", "name": "Ghengis Khan", "objective": "China and Russian infrastructure"}'
fs_sink.add(camp4)
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--144111b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp4_r = fs_source.get("campaign--144111b6-1112-4fb0-111b-b111107ca70a")
assert camp4_r.id == "campaign--144111b6-1112-4fb0-111b-b111107ca70a"
assert camp4_r.name == "Ghengis Khan"
# add json-encoded stix bundle
bund2 = '{"type": "bundle", "id": "bundle--332211b6-1132-4fb0-111b-b111107ca70a",' \
' "spec_version": "2.0", "objects": [{"type": "campaign", "id": "campaign--155155b6-1112-4fb0-111b-b111107ca70a",' \
' "created":"2017-05-31T21:31:53.197755Z", "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}'
fs_sink.add(bund2)
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--155155b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp5_r = fs_source.get("campaign--155155b6-1112-4fb0-111b-b111107ca70a")
assert camp5_r.id == "campaign--155155b6-1112-4fb0-111b-b111107ca70a"
assert camp5_r.name == "Spartacus"
# add list of objects
camp6 = Campaign(name="Comanche",
objective="US Midwest manufacturing firms, oil refineries, and businesses",
aliases=["Horse Warrior"])
camp7 = {
"name": "Napolean",
"type": "campaign",
"objective": "Central and Eastern Europe military commands and departments",
"aliases": ["The Frenchmen"],
"id": "campaign--122818b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
fs_sink.add([camp6, camp7])
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp6.id + ".json"))
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp6_r = fs_source.get(camp6.id)
assert camp6_r.id == camp6.id
assert "Horse Warrior" in camp6_r.aliases
camp7_r = fs_source.get(camp7["id"])
assert camp7_r.id == camp7["id"]
assert "The Frenchmen" in camp7_r.aliases
# remove all added objects
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp2_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp3_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp4_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp5_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp6_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp7_r.id + ".json"))
# remove campaign dir (that was added in course of testing)
os.rmdir(os.path.join(FS_PATH, "campaign"))
def test_filesystem_store():
# creation
fs_store = FileSystemStore(FS_PATH)
# get()
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
assert coa.type == "course-of-action"
# all versions() - (note at this time, all_versions() is still not applicable to FileSystem, as only one version is ever stored)
rel = fs_store.all_versions("relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1")[0]
assert rel.id == "relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1"
assert rel.type == "relationship"
# query()
tools = fs_store.query([Filter("labels", "in", "tool")])
assert len(tools) == 2
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
# add()
camp1 = Campaign(name="Great Heathen Army",
objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England",
aliases=["Ragnar"])
fs_store.add(camp1)
camp1_r = fs_store.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == camp1.name
# remove
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
# remove campaign dir
os.rmdir(os.path.join(FS_PATH, "campaign"))

View File

@ -0,0 +1,377 @@
import os
import shutil
import pytest
from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
FileSystemSource, FileSystemStore, Filter, properties)
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
@pytest.fixture
def fs_store():
# create
yield FileSystemStore(FS_PATH)
# remove campaign dir
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
@pytest.fixture
def fs_source():
# create
fs = FileSystemSource(FS_PATH)
assert fs.stix_dir == FS_PATH
yield fs
# remove campaign dir
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
@pytest.fixture
def fs_sink():
# create
fs = FileSystemSink(FS_PATH)
assert fs.stix_dir == FS_PATH
yield fs
# remove campaign dir
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
def test_filesystem_source_nonexistent_folder():
with pytest.raises(ValueError) as excinfo:
FileSystemSource('nonexistent-folder')
assert "for STIX data does not exist" in str(excinfo)
def test_filesystem_sink_nonexistent_folder():
with pytest.raises(ValueError) as excinfo:
FileSystemSink('nonexistent-folder')
assert "for STIX data does not exist" in str(excinfo)
def test_filesytem_source_get_object(fs_source):
# get object
mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")
assert mal.id == "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"
assert mal.name == "Rover"
def test_filesytem_source_get_nonexistent_object(fs_source):
ind = fs_source.get("indicator--6b616fc1-1505-48e3-8b2c-0d19337bff38")
assert ind is None
def test_filesytem_source_all_versions(fs_source):
# all versions - (currently not a true all versions call as FileSystem cant have multiple versions)
id_ = fs_source.get("identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5")
assert id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
assert id_.name == "The MITRE Corporation"
assert id_.type == "identity"
def test_filesytem_source_query_single(fs_source):
# query2
is_2 = fs_source.query([Filter("external_references.external_id", '=', "T1027")])
assert len(is_2) == 1
is_2 = is_2[0]
assert is_2.id == "attack-pattern--b3d682b6-98f2-4fb0-aa3b-b4df007ca70a"
assert is_2.type == "attack-pattern"
def test_filesytem_source_query_multiple(fs_source):
# query
intrusion_sets = fs_source.query([Filter("type", '=', "intrusion-set")])
assert len(intrusion_sets) == 2
assert "intrusion-set--a653431d-6a5e-4600-8ad3-609b5af57064" in [is_.id for is_ in intrusion_sets]
assert "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a" in [is_.id for is_ in intrusion_sets]
is_1 = [is_ for is_ in intrusion_sets if is_.id == "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a"][0]
assert "DragonOK" in is_1.aliases
assert len(is_1.external_references) == 4
def test_filesystem_sink_add_python_stix_object(fs_sink, fs_source):
# add python stix object
camp1 = Campaign(name="Hannibal",
objective="Targeting Italian and Spanish Diplomat internet accounts",
aliases=["War Elephant"])
fs_sink.add(camp1)
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp1.id + ".json"))
camp1_r = fs_source.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == "Hannibal"
assert "War Elephant" in camp1_r.aliases
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
def test_filesystem_sink_add_stix_object_dict(fs_sink, fs_source):
# add stix object dict
camp2 = {
"name": "Aurelius",
"type": "campaign",
"objective": "German and French Intelligence Services",
"aliases": ["Purple Robes"],
"id": "campaign--111111b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
fs_sink.add(camp2)
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp2["id"] + ".json"))
camp2_r = fs_source.get(camp2["id"])
assert camp2_r.id == camp2["id"]
assert camp2_r.name == camp2["name"]
assert "Purple Robes" in camp2_r.aliases
os.remove(os.path.join(FS_PATH, "campaign", camp2_r.id + ".json"))
def test_filesystem_sink_add_stix_bundle_dict(fs_sink, fs_source):
# add stix bundle dict
bund = {
"type": "bundle",
"id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a",
"spec_version": "2.0",
"objects": [
{
"name": "Atilla",
"type": "campaign",
"objective": "Bulgarian, Albanian and Romanian Intelligence Services",
"aliases": ["Huns"],
"id": "campaign--133111b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
]
}
fs_sink.add(bund)
assert os.path.exists(os.path.join(FS_PATH, "campaign", bund["objects"][0]["id"] + ".json"))
camp3_r = fs_source.get(bund["objects"][0]["id"])
assert camp3_r.id == bund["objects"][0]["id"]
assert camp3_r.name == bund["objects"][0]["name"]
assert "Huns" in camp3_r.aliases
os.remove(os.path.join(FS_PATH, "campaign", camp3_r.id + ".json"))
def test_filesystem_sink_add_json_stix_object(fs_sink, fs_source):
# add json-encoded stix obj
camp4 = '{"type": "campaign", "id":"campaign--144111b6-1112-4fb0-111b-b111107ca70a",'\
' "created":"2017-05-31T21:31:53.197755Z", "name": "Ghengis Khan", "objective": "China and Russian infrastructure"}'
fs_sink.add(camp4)
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--144111b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp4_r = fs_source.get("campaign--144111b6-1112-4fb0-111b-b111107ca70a")
assert camp4_r.id == "campaign--144111b6-1112-4fb0-111b-b111107ca70a"
assert camp4_r.name == "Ghengis Khan"
os.remove(os.path.join(FS_PATH, "campaign", camp4_r.id + ".json"))
def test_filesystem_sink_json_stix_bundle(fs_sink, fs_source):
# add json-encoded stix bundle
bund2 = '{"type": "bundle", "id": "bundle--332211b6-1132-4fb0-111b-b111107ca70a",' \
' "spec_version": "2.0", "objects": [{"type": "campaign", "id": "campaign--155155b6-1112-4fb0-111b-b111107ca70a",' \
' "created":"2017-05-31T21:31:53.197755Z", "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}'
fs_sink.add(bund2)
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--155155b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp5_r = fs_source.get("campaign--155155b6-1112-4fb0-111b-b111107ca70a")
assert camp5_r.id == "campaign--155155b6-1112-4fb0-111b-b111107ca70a"
assert camp5_r.name == "Spartacus"
os.remove(os.path.join(FS_PATH, "campaign", camp5_r.id + ".json"))
def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
# add list of objects
camp6 = Campaign(name="Comanche",
objective="US Midwest manufacturing firms, oil refineries, and businesses",
aliases=["Horse Warrior"])
camp7 = {
"name": "Napolean",
"type": "campaign",
"objective": "Central and Eastern Europe military commands and departments",
"aliases": ["The Frenchmen"],
"id": "campaign--122818b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
fs_sink.add([camp6, camp7])
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp6.id + ".json"))
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp6_r = fs_source.get(camp6.id)
assert camp6_r.id == camp6.id
assert "Horse Warrior" in camp6_r.aliases
camp7_r = fs_source.get(camp7["id"])
assert camp7_r.id == camp7["id"]
assert "The Frenchmen" in camp7_r.aliases
# remove all added objects
os.remove(os.path.join(FS_PATH, "campaign", camp6_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp7_r.id + ".json"))
def test_filesystem_store_get_stored_as_bundle(fs_store):
coa = fs_store.get("course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f")
assert coa.id == "course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f"
assert coa.type == "course-of-action"
def test_filesystem_store_get_stored_as_object(fs_store):
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
assert coa.type == "course-of-action"
def test_filesystem_store_all_versions(fs_store):
# all versions() - (note at this time, all_versions() is still not applicable to FileSystem, as only one version is ever stored)
rel = fs_store.all_versions("relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1")[0]
assert rel.id == "relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1"
assert rel.type == "relationship"
def test_filesystem_store_query(fs_store):
# query()
tools = fs_store.query([Filter("labels", "in", "tool")])
assert len(tools) == 2
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
def test_filesystem_store_query_single_filter(fs_store):
query = Filter("labels", "in", "tool")
tools = fs_store.query(query)
assert len(tools) == 2
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
def test_filesystem_store_empty_query(fs_store):
results = fs_store.query() # returns all
assert len(results) == 26
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj.id for obj in results]
assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj.id for obj in results]
def test_filesystem_store_query_multiple_filters(fs_store):
fs_store.source.filters.add(Filter("labels", "in", "tool"))
tools = fs_store.query(Filter("id", "=", "tool--242f3da3-4425-4d11-8f5c-b842886da966"))
assert len(tools) == 1
assert tools[0].id == "tool--242f3da3-4425-4d11-8f5c-b842886da966"
def test_filesystem_store_query_dont_include_type_folder(fs_store):
results = fs_store.query(Filter("type", "!=", "tool"))
assert len(results) == 24
def test_filesystem_store_add(fs_store):
# add()
camp1 = Campaign(name="Great Heathen Army",
objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England",
aliases=["Ragnar"])
fs_store.add(camp1)
camp1_r = fs_store.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == camp1.name
# remove
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
def test_filesystem_store_add_as_bundle():
fs_store = FileSystemStore(FS_PATH, bundlify=True)
camp1 = Campaign(name="Great Heathen Army",
objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England",
aliases=["Ragnar"])
fs_store.add(camp1)
with open(os.path.join(FS_PATH, "campaign", camp1.id + ".json")) as bundle_file:
assert '"type": "bundle"' in bundle_file.read()
camp1_r = fs_store.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == camp1.name
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
def test_filesystem_add_bundle_object(fs_store):
bundle = Bundle()
fs_store.add(bundle)
def test_filesystem_store_add_invalid_object(fs_store):
ind = ('campaign', 'campaign--111111b6-1112-4fb0-111b-b111107ca70a') # tuple isn't valid
with pytest.raises(TypeError) as excinfo:
fs_store.add(ind)
assert 'stix_data must be' in str(excinfo.value)
assert 'a STIX object' in str(excinfo.value)
assert 'JSON formatted STIX' in str(excinfo.value)
assert 'JSON formatted STIX bundle' in str(excinfo.value)
def test_filesystem_object_with_custom_property(fs_store):
camp = Campaign(name="Scipio Africanus",
objective="Defeat the Carthaginians",
x_empire="Roman",
allow_custom=True)
fs_store.add(camp, True)
camp_r = fs_store.get(camp.id, True)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
def test_filesystem_object_with_custom_property_in_bundle(fs_store):
camp = Campaign(name="Scipio Africanus",
objective="Defeat the Carthaginians",
x_empire="Roman",
allow_custom=True)
bundle = Bundle(camp, allow_custom=True)
fs_store.add(bundle, True)
camp_r = fs_store.get(camp.id, True)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
def test_filesystem_custom_object(fs_store):
@CustomObject('x-new-obj', [
('property1', properties.StringProperty(required=True)),
])
class NewObj():
pass
newobj = NewObj(property1='something')
fs_store.add(newobj, True)
newobj_r = fs_store.get(newobj.id, True)
assert newobj_r.id == newobj.id
assert newobj_r.property1 == 'something'
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True)

270
stix2/test/test_memory.py Normal file
View File

@ -0,0 +1,270 @@
import pytest
from stix2 import (Bundle, Campaign, CustomObject, Filter, MemorySource,
MemoryStore, properties)
from stix2.sources import make_id
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND2 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND3 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.936Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND4 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND5 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND6 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-31T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND7 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND8 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
STIX_OBJS2 = [IND6, IND7, IND8]
STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
@pytest.fixture
def mem_store():
yield MemoryStore(STIX_OBJS1)
@pytest.fixture
def mem_source():
yield MemorySource(STIX_OBJS1)
def test_memory_source_get(mem_source):
resp = mem_source.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
def test_memory_source_get_nonexistant_object(mem_source):
resp = mem_source.get("tool--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
assert resp is None
def test_memory_store_all_versions(mem_store):
# Add bundle of items to sink
mem_store.add(dict(id="bundle--%s" % make_id(),
objects=STIX_OBJS2,
spec_version="2.0",
type="bundle"))
resp = mem_store.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
assert len(resp) == 1 # MemoryStore can only store 1 version of each object
def test_memory_store_query(mem_store):
query = [Filter('type', '=', 'malware')]
resp = mem_store.query(query)
assert len(resp) == 0
def test_memory_store_query_single_filter(mem_store):
query = Filter('id', '=', 'indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f')
resp = mem_store.query(query)
assert len(resp) == 1
def test_memory_store_query_empty_query(mem_store):
resp = mem_store.query()
# sort since returned in random order
resp = sorted(resp, key=lambda k: k['id'])
assert len(resp) == 2
assert resp[0]['id'] == 'indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f'
assert resp[0]['modified'] == '2017-01-27T13:49:53.935Z'
assert resp[1]['id'] == 'indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f'
assert resp[1]['modified'] == '2017-01-27T13:49:53.936Z'
def test_memory_store_query_multiple_filters(mem_store):
mem_store.source.filters.add(Filter('type', '=', 'indicator'))
query = Filter('id', '=', 'indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f')
resp = mem_store.query(query)
assert len(resp) == 1
def test_memory_store_add_stix_object_str(mem_store):
# add stix object string
camp_id = "campaign--111111b6-1112-4fb0-111b-b111107ca70a"
camp_name = "Aurelius"
camp_alias = "Purple Robes"
camp = """{
"name": "%s",
"type": "campaign",
"objective": "German and French Intelligence Services",
"aliases": ["%s"],
"id": "%s",
"created": "2017-05-31T21:31:53.197755Z"
}""" % (camp_name, camp_alias, camp_id)
mem_store.add(camp)
camp_r = mem_store.get(camp_id)
assert camp_r["id"] == camp_id
assert camp_r["name"] == camp_name
assert camp_alias in camp_r["aliases"]
def test_memory_store_add_stix_bundle_str(mem_store):
# add stix bundle string
camp_id = "campaign--133111b6-1112-4fb0-111b-b111107ca70a"
camp_name = "Atilla"
camp_alias = "Huns"
bund = """{
"type": "bundle",
"id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a",
"spec_version": "2.0",
"objects": [
{
"name": "%s",
"type": "campaign",
"objective": "Bulgarian, Albanian and Romanian Intelligence Services",
"aliases": ["%s"],
"id": "%s",
"created": "2017-05-31T21:31:53.197755Z"
}
]
}""" % (camp_name, camp_alias, camp_id)
mem_store.add(bund)
camp_r = mem_store.get(camp_id)
assert camp_r["id"] == camp_id
assert camp_r["name"] == camp_name
assert camp_alias in camp_r["aliases"]
def test_memory_store_add_invalid_object(mem_store):
ind = ('indicator', IND1) # tuple isn't valid
with pytest.raises(TypeError) as excinfo:
mem_store.add(ind)
assert 'stix_data must be' in str(excinfo.value)
assert 'a STIX object' in str(excinfo.value)
assert 'JSON formatted STIX' in str(excinfo.value)
assert 'JSON formatted STIX bundle' in str(excinfo.value)
def test_memory_store_object_with_custom_property(mem_store):
camp = Campaign(name="Scipio Africanus",
objective="Defeat the Carthaginians",
x_empire="Roman",
allow_custom=True)
mem_store.add(camp, True)
camp_r = mem_store.get(camp.id, True)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
def test_memory_store_object_with_custom_property_in_bundle(mem_store):
camp = Campaign(name="Scipio Africanus",
objective="Defeat the Carthaginians",
x_empire="Roman",
allow_custom=True)
bundle = Bundle(camp, allow_custom=True)
mem_store.add(bundle, True)
bundle_r = mem_store.get(bundle.id, True)
camp_r = bundle_r['objects'][0]
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
def test_memory_store_custom_object(mem_store):
@CustomObject('x-new-obj', [
('property1', properties.StringProperty(required=True)),
])
class NewObj():
pass
newobj = NewObj(property1='something')
mem_store.add(newobj, True)
newobj_r = mem_store.get(newobj.id, True)
assert newobj_r.id == newobj.id
assert newobj_r.property1 == 'something'

View File

@ -34,7 +34,7 @@ class STIXdatetime(dt.datetime):
def deduplicate(stix_obj_list):
"""Deduplicate a list of STIX objects to a unique set
"""Deduplicate a list of STIX objects to a unique set.
Reduces a set of STIX objects to unique set by looking
at 'id' and 'modified' fields - as a unique object version
@ -44,7 +44,6 @@ def deduplicate(stix_obj_list):
of deduplicate(),that if the "stix_obj_list" argument has
multiple STIX objects of the same version, the last object
version found in the list will be the one that is returned.
()
Args:
stix_obj_list (list): list of STIX objects (dicts)
@ -56,7 +55,11 @@ def deduplicate(stix_obj_list):
unique_objs = {}
for obj in stix_obj_list:
try:
unique_objs[(obj['id'], obj['modified'])] = obj
except KeyError:
# Handle objects with no `modified` property, e.g. marking-definition
unique_objs[(obj['id'], obj['created'])] = obj
return list(unique_objs.values())