From 2399ee62ecee09cfac61dd1ede442fb7bc0d6312 Mon Sep 17 00:00:00 2001 From: = Date: Wed, 29 Nov 2017 12:03:10 -0500 Subject: [PATCH] adjusting tests --- stix2/sources/filesystem.py | 66 +++++++++++-------- stix2/sources/memory.py | 120 ++++++++++++++++------------------ stix2/sources/taxii.py | 62 ++++++++++-------- stix2/test/test_filesystem.py | 10 +-- stix2/test/test_memory.py | 102 ++++++++++++++--------------- stix2/test/test_versioning.py | 42 +++++++++++- stix2/utils.py | 11 ++-- 7 files changed, 231 insertions(+), 182 deletions(-) diff --git a/stix2/sources/filesystem.py b/stix2/sources/filesystem.py index e92c525..050ca23 100644 --- a/stix2/sources/filesystem.py +++ b/stix2/sources/filesystem.py @@ -1,8 +1,6 @@ """ Python STIX 2.0 FileSystem Source/Sink -TODO: - Test everything """ import json @@ -22,7 +20,12 @@ class FileSystemStore(DataStore): Args: stix_dir (str): path to directory of STIX objects - bundlify (bool): Whether to wrap objects in bundles when saving them. + allow_custom (bool): whether to allow custom STIX content to be + pushed/retrieved. Defaults to True for FileSystemSource side(retrieving data) + and False for FileSystemSink side(pushing data). However, when + parameter is supplied, it will be applied to both FileSystemSource + and FileSystemSink. + bundlify (bool): whether to wrap objects in bundles when saving them. Default: False. Attributes: @@ -30,10 +33,16 @@ class FileSystemStore(DataStore): sink (FileSystemSink): FileSystemSink """ - def __init__(self, stix_dir, bundlify=False): + def __init__(self, stix_dir, allow_custom=None, bundlify=False): + if not allow_custom: + allow_custom_source = True + allow_custom_sink = False + else: + allow_custom_sink = allow_custom_source = allow_custom + super(FileSystemStore, self).__init__( - source=FileSystemSource(stix_dir=stix_dir), - sink=FileSystemSink(stix_dir=stix_dir, bundlify=bundlify) + source=FileSystemSource(stix_dir=stix_dir, allow_custom=allow_custom_source), + sink=FileSystemSink(stix_dir=stix_dir, allow_custom=allow_custom_sink, bundlify=bundlify) ) @@ -46,13 +55,16 @@ class FileSystemSink(DataSink): Args: stix_dir (str): path to directory of STIX objects. + allow_custom (bool): Whether to allow custom STIX content to be + added to the FileSystemSource. Default: False bundlify (bool): Whether to wrap objects in bundles when saving them. Default: False. """ - def __init__(self, stix_dir, bundlify=False): + def __init__(self, stix_dir, allow_custom=False, bundlify=False): super(FileSystemSink, self).__init__() self._stix_dir = os.path.abspath(stix_dir) + self.allow_custom = allow_custom self.bundlify = bundlify if not os.path.exists(self._stix_dir): @@ -71,20 +83,18 @@ class FileSystemSink(DataSink): os.makedirs(os.path.dirname(path)) if self.bundlify: - stix_obj = Bundle(stix_obj) + stix_obj = Bundle(stix_obj, allow_custom=self.allow_custom) with open(path, "w") as f: f.write(str(stix_obj)) - def add(self, stix_data=None, allow_custom=False, version=None): + def add(self, stix_data=None, version=None): """Add STIX objects to file directory. Args: stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content in a STIX object (or list of), dict (or list of), or a STIX 2.0 json encoded string. - allow_custom (bool): whether to allow custom objects/properties or - not. Default: False. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. @@ -100,24 +110,24 @@ class FileSystemSink(DataSink): self._check_path_and_write(stix_data) elif isinstance(stix_data, (str, dict)): - stix_data = parse(stix_data, allow_custom=allow_custom, version=version) + stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version) if stix_data["type"] == "bundle": # extract STIX objects for stix_obj in stix_data.get("objects", []): - self.add(stix_obj, allow_custom=allow_custom, version=version) + self.add(stix_obj, version=version) else: # adding json-formatted STIX - self._check_path_and_write(stix_data) + self._check_path_and_write(stix_data,) elif isinstance(stix_data, Bundle): # recursively add individual STIX objects for stix_obj in stix_data.get("objects", []): - self.add(stix_obj, allow_custom=allow_custom, version=version) + self.add(stix_obj, version=version) elif isinstance(stix_data, list): # recursively add individual STIX objects for stix_obj in stix_data: - self.add(stix_obj, allow_custom=allow_custom, version=version) + self.add(stix_obj, version=version) else: raise TypeError("stix_data must be a STIX object (or list of), " @@ -134,11 +144,14 @@ class FileSystemSource(DataSource): Args: stix_dir (str): path to directory of STIX objects + allow_custom (bool): Whether to allow custom STIX content to be + added to the FileSystemSink. Default: True """ - def __init__(self, stix_dir): + def __init__(self, stix_dir, allow_custom=True): super(FileSystemSource, self).__init__() self._stix_dir = os.path.abspath(stix_dir) + self.allow_custom = allow_custom if not os.path.exists(self._stix_dir): raise ValueError("directory path for STIX data does not exist: %s" % self._stix_dir) @@ -147,15 +160,13 @@ class FileSystemSource(DataSource): def stix_dir(self): return self._stix_dir - def get(self, stix_id, allow_custom=False, version=None, _composite_filters=None): + def get(self, stix_id, version=None, _composite_filters=None): """Retrieve STIX object from file directory via STIX ID. Args: stix_id (str): The STIX ID of the STIX object to be retrieved. _composite_filters (set): set of filters passed from the parent CompositeDataSource, not user supplied - allow_custom (bool): whether to retrieve custom objects/properties - or not. Default: False. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. @@ -167,7 +178,7 @@ class FileSystemSource(DataSource): """ query = [Filter("id", "=", stix_id)] - all_data = self.query(query=query, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters) + all_data = self.query(query=query, version=version, _composite_filters=_composite_filters) if all_data: stix_obj = sorted(all_data, key=lambda k: k['modified'])[0] @@ -176,7 +187,7 @@ class FileSystemSource(DataSource): return stix_obj - def all_versions(self, stix_id, allow_custom=False, version=None, _composite_filters=None): + def all_versions(self, stix_id, version=None, _composite_filters=None): """Retrieve STIX object from file directory via STIX ID, all versions. Note: Since FileSystem sources/sinks don't handle multiple versions @@ -186,8 +197,6 @@ class FileSystemSource(DataSource): stix_id (str): The STIX ID of the STIX objects to be retrieved. _composite_filters (set): set of filters passed from the parent CompositeDataSource, not user supplied - allow_custom (bool): whether to retrieve custom objects/properties - or not. Default: False. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. @@ -197,9 +206,9 @@ class FileSystemSource(DataSource): a python STIX objects and then returned """ - return [self.get(stix_id=stix_id, allow_custom=allow_custom, version=version, _composite_filters=_composite_filters)] + return [self.get(stix_id=stix_id, version=version, _composite_filters=_composite_filters)] - def query(self, query=None, allow_custom=False, version=None, _composite_filters=None): + def query(self, query=None, version=None, _composite_filters=None): """Search and retrieve STIX objects based on the complete query. A "complete query" includes the filters from the query, the filters @@ -210,8 +219,6 @@ class FileSystemSource(DataSource): query (list): list of filters to search on _composite_filters (set): set of filters passed from the CompositeDataSource, not user supplied - allow_custom (bool): whether to retrieve custom objects/properties - or not. Default: False. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. @@ -221,6 +228,7 @@ class FileSystemSource(DataSource): parsed into a python STIX objects and then returned. """ + all_data = [] if query is None: @@ -304,7 +312,7 @@ class FileSystemSource(DataSource): all_data = deduplicate(all_data) # parse python STIX objects from the STIX object dicts - stix_objs = [parse(stix_obj_dict, allow_custom=allow_custom, version=version) for stix_obj_dict in all_data] + stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom, version=version) for stix_obj_dict in all_data] return stix_objs diff --git a/stix2/sources/memory.py b/stix2/sources/memory.py index 32a6756..f094f90 100644 --- a/stix2/sources/memory.py +++ b/stix2/sources/memory.py @@ -21,7 +21,7 @@ from stix2.sources import DataSink, DataSource, DataStore from stix2.sources.filters import Filter, apply_common_filters -def _add(store, stix_data=None, allow_custom=False, version=None): +def _add(store, stix_data=None, version=None): """Add STIX objects to MemoryStore/Sink. Adds STIX objects to an in-memory dictionary for fast lookup. @@ -29,8 +29,6 @@ def _add(store, stix_data=None, allow_custom=False, version=None): Args: stix_data (list OR dict OR STIX object): STIX objects to be added - allow_custom (bool): whether to allow custom objects/properties or - not. Default: False. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. @@ -43,28 +41,19 @@ def _add(store, stix_data=None, allow_custom=False, version=None): if stix_data["type"] == "bundle": # adding a json bundle - so just grab STIX objects for stix_obj in stix_data.get("objects", []): - _add(store, stix_obj, allow_custom=allow_custom, version=version) + _add(store, stix_obj, version=version) else: # adding a json STIX object store._data[stix_data["id"]] = stix_data - elif isinstance(stix_data, str): - # adding json encoded string of STIX content - stix_data = parse(stix_data, allow_custom=allow_custom, version=version) - if stix_data["type"] == "bundle": - # recurse on each STIX object in bundle - for stix_obj in stix_data.get("objects", []): - _add(store, stix_obj, allow_custom=allow_custom, version=version) - else: - _add(store, stix_data, allow_custom=allow_custom, version=version) - elif isinstance(stix_data, list): # STIX objects are in a list- recurse on each object for stix_obj in stix_data: - _add(store, stix_obj, allow_custom=allow_custom, version=version) + _add(store, stix_obj, version=version) else: - raise TypeError("stix_data must be a STIX object (or list of), JSON formatted STIX (or list of), or a JSON formatted STIX bundle") + raise TypeError("stix_data expected to be a python-stix2 object (or list of), JSON formatted STIX (or list of)," + " or a JSON formatted STIX bundle. stix_data was of type: " + str(type(stix_data))) class MemoryStore(DataStore): @@ -78,8 +67,9 @@ class MemoryStore(DataStore): Args: stix_data (list OR dict OR STIX object): STIX content to be added - allow_custom (bool): whether to allow custom objects/properties or - not. Default: False. + allow_custom (bool): whether to allow custom STIX content. + Only applied when export/input functions called, i.e. + load_from_file() and save_to_file(). Defaults to True. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. @@ -89,11 +79,11 @@ class MemoryStore(DataStore): sink (MemorySink): MemorySink """ - def __init__(self, stix_data=None, allow_custom=False, version=None): + def __init__(self, stix_data=None, allow_custom=True, version=None): self._data = {} if stix_data: - _add(self, stix_data, allow_custom=allow_custom, version=version) + _add(self, stix_data, version=version) super(MemoryStore, self).__init__( source=MemorySource(stix_data=self._data, allow_custom=allow_custom, version=version, _store=True), @@ -101,31 +91,11 @@ class MemoryStore(DataStore): ) def save_to_file(self, *args, **kwargs): - """Write SITX objects from in-memory dictionary to JSON file, as a STIX - Bundle. - - Args: - file_path (str): file path to write STIX data to - allow_custom (bool): whether to allow custom objects/properties or - not. Default: False. - - """ + """See MemorySink.save_to_file() for documentation""" return self.sink.save_to_file(*args, **kwargs) def load_from_file(self, *args, **kwargs): - """Load STIX data from JSON file. - - File format is expected to be a single JSON - STIX object or JSON STIX bundle. - - Args: - file_path (str): file path to load STIX data from - allow_custom (bool): whether to allow custom objects/properties or - not. Default: False. - version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If - None, use latest version. - - """ + """See MemorySource.load_from_file() for documentation""" return self.source.load_from_file(*args, **kwargs) @@ -138,11 +108,12 @@ class MemorySink(DataSink): Args: stix_data (dict OR list): valid STIX 2.0 content in bundle or a list. - _store (bool): if the MemorySink is a part of a DataStore, + _store (bool): whether the MemorySink is a part of a DataStore, in which case "stix_data" is a direct reference to shared memory with DataSource. Not user supplied - allow_custom (bool): whether to allow custom objects/properties or - not. Default: False. + allow_custom (bool): whether to allow custom objects/properties + when exporting STIX content to file. + Default: True. Attributes: _data (dict): the in-memory dict that holds STIX objects. @@ -150,25 +121,34 @@ class MemorySink(DataSink): a MemorySource """ - def __init__(self, stix_data=None, allow_custom=False, version=None, _store=False): + def __init__(self, stix_data=None, allow_custom=True, version=None, _store=False): super(MemorySink, self).__init__() self._data = {} + self.allow_custom = allow_custom if _store: self._data = stix_data elif stix_data: - _add(self, stix_data, allow_custom=allow_custom, version=version) + _add(self, stix_data, version=version) - def add(self, stix_data, allow_custom=False, version=None): - _add(self, stix_data, allow_custom=allow_custom, version=version) + def add(self, stix_data, version=None): + _add(self, stix_data, version=version) add.__doc__ = _add.__doc__ - def save_to_file(self, file_path, allow_custom=False): + def save_to_file(self, file_path): + """Write SITX objects from in-memory dictionary to JSON file, as a STIX + Bundle. + + Args: + file_path (str): file path to write STIX data to + + """ file_path = os.path.abspath(file_path) + if not os.path.exists(os.path.dirname(file_path)): os.makedirs(os.path.dirname(file_path)) with open(file_path, "w") as f: - f.write(str(Bundle(list(self._data.values()), allow_custom=allow_custom))) + f.write(str(Bundle(list(self._data.values()), allow_custom=self.allow_custom))) save_to_file.__doc__ = MemoryStore.save_to_file.__doc__ @@ -185,8 +165,9 @@ class MemorySource(DataSource): _store (bool): if the MemorySource is a part of a DataStore, in which case "stix_data" is a direct reference to shared memory with DataSink. Not user supplied - allow_custom (bool): whether to allow custom objects/properties or - not. Default: False. + allow_custom (bool): whether to allow custom objects/properties + when importing STIX content from file. + Default: True. Attributes: _data (dict): the in-memory dict that holds STIX objects. @@ -194,14 +175,15 @@ class MemorySource(DataSource): a MemorySink """ - def __init__(self, stix_data=None, allow_custom=False, version=None, _store=False): + def __init__(self, stix_data=None, allow_custom=True, version=None, _store=False): super(MemorySource, self).__init__() self._data = {} + self.allow_custom = allow_custom if _store: self._data = stix_data elif stix_data: - _add(self, stix_data, allow_custom=allow_custom, version=version) + _add(self, stix_data, version=version) def get(self, stix_id, _composite_filters=None): """Retrieve STIX object from in-memory dict via STIX ID. @@ -257,6 +239,7 @@ class MemorySource(DataSource): is returned in the same form as it as added """ + return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)] def query(self, query=None, _composite_filters=None): @@ -298,15 +281,24 @@ class MemorySource(DataSource): return all_data - def load_from_file(self, file_path, allow_custom=False, version=None): - """ Load JSON formatted STIX content from file and add to Memory.""" - file_path = os.path.abspath(file_path) + def load_from_file(self, file_path, version=None): + """Load STIX data from JSON file. - # converting the STIX content to JSON encoded string before calling - # _add() so that the STIX content is added as python-stix2 objects - # to the in-memory dict. Otherwise, if you pass a dict to _add(), - # it gets stored as a dict. - stix_data = json.dumps(json.load(open(file_path, "r"))) + File format is expected to be a single JSON + STIX object or JSON STIX bundle. - _add(self, stix_data, allow_custom=allow_custom, version=version) + Args: + file_path (str): file path to load STIX data from + version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If + None, use latest version. + + """ + stix_data = json.load(open(os.path.abspath(file_path), "r")) + + if stix_data["type"] == "bundle": + for stix_obj in stix_data["objects"]: + print(stix_obj) + _add(self, stix_data=parse(stix_obj, allow_custom=self.allow_custom, version=stix_data["spec_version"])) + else: + _add(self, stix_data=parse(stix_obj, allow_custom=self.allow_custom, version=version)) load_from_file.__doc__ = MemoryStore.load_from_file.__doc__ diff --git a/stix2/sources/taxii.py b/stix2/sources/taxii.py index 512fb58..5b5c53c 100644 --- a/stix2/sources/taxii.py +++ b/stix2/sources/taxii.py @@ -19,11 +19,23 @@ class TAXIICollectionStore(DataStore): Args: collection (taxii2.Collection): TAXII Collection instance + allow_custom (bool): whether to allow custom STIX content to be + pushed/retrieved. Defaults to True for TAXIICollectionSource + side(retrieving data) and False for TAXIICollectionSink + side(pushing data). However, when parameter is supplied, it will + be applied to both TAXIICollectionSource/Sink. + """ - def __init__(self, collection): + def __init__(self, collection, allow_custom=None): + if not allow_custom: + allow_custom_source = True + allow_custom_sink = False + else: + allow_custom_sink = allow_custom_source = allow_custom + super(TAXIICollectionStore, self).__init__( - source=TAXIICollectionSource(collection), - sink=TAXIICollectionSink(collection) + source=TAXIICollectionSource(collection, allow_custom=allow_custom_source), + sink=TAXIICollectionSink(collection, allow_custom=allow_custom_sink) ) @@ -33,48 +45,49 @@ class TAXIICollectionSink(DataSink): Args: collection (taxii2.Collection): TAXII2 Collection instance + allow_custom (bool): Whether to allow custom STIX content to be + added to the TAXIICollectionSink. Default: False """ - def __init__(self, collection): + def __init__(self, collection, allow_custom=False): super(TAXIICollectionSink, self).__init__() self.collection = collection + self.allow_custom = allow_custom - def add(self, stix_data, allow_custom=False, version=None): + def add(self, stix_data, version=None): """Add/push STIX content to TAXII Collection endpoint Args: stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content in a STIX object (or Bundle), STIX onject dict (or Bundle dict), or a STIX 2.0 json encoded string, or list of any of the following - allow_custom (bool): whether to allow custom objects/properties or - not. Default: False. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. """ if isinstance(stix_data, _STIXBase): # adding python STIX object - bundle = dict(Bundle(stix_data, allow_custom=allow_custom)) + bundle = dict(Bundle(stix_data, allow_custom=self.allow_custom)) elif isinstance(stix_data, dict): # adding python dict (of either Bundle or STIX obj) if stix_data["type"] == "bundle": bundle = stix_data else: - bundle = dict(Bundle(stix_data, allow_custom=allow_custom)) + bundle = dict(Bundle(stix_data, allow_custom=self.allow_custom)) elif isinstance(stix_data, list): # adding list of something - recurse on each for obj in stix_data: - self.add(obj, allow_custom=allow_custom, version=version) + self.add(obj, version=version) elif isinstance(stix_data, str): # adding json encoded string of STIX content - stix_data = parse(stix_data, allow_custom=allow_custom, version=version) + stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version) if stix_data["type"] == "bundle": bundle = dict(stix_data) else: - bundle = dict(Bundle(stix_data, allow_custom=allow_custom)) + bundle = dict(Bundle(stix_data, allow_custom=self.allow_custom)) else: raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle") @@ -88,13 +101,16 @@ class TAXIICollectionSource(DataSource): Args: collection (taxii2.Collection): TAXII Collection instance + allow_custom (bool): Whether to allow custom STIX content to be + added to the FileSystemSink. Default: True """ - def __init__(self, collection): + def __init__(self, collection, allow_custom=True): super(TAXIICollectionSource, self).__init__() self.collection = collection + self.allow_custom = allow_custom - def get(self, stix_id, allow_custom=False, version=None, _composite_filters=None): + def get(self, stix_id, version=None, _composite_filters=None): """Retrieve STIX object from local/remote STIX Collection endpoint. @@ -102,8 +118,6 @@ class TAXIICollectionSource(DataSource): stix_id (str): The STIX ID of the STIX object to be retrieved. _composite_filters (set): set of filters passed from the parent CompositeDataSource, not user supplied - allow_custom (bool): whether to retrieve custom objects/properties - or not. Default: False. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. @@ -131,7 +145,7 @@ class TAXIICollectionSource(DataSource): stix_obj = [] if len(stix_obj): - stix_obj = parse(stix_obj[0], allow_custom=allow_custom, version=version) + stix_obj = parse(stix_obj[0], allow_custom=self.allow_custom, version=version) if stix_obj.id != stix_id: # check - was added to handle erroneous TAXII servers stix_obj = None @@ -140,7 +154,7 @@ class TAXIICollectionSource(DataSource): return stix_obj - def all_versions(self, stix_id, allow_custom=False, version=None, _composite_filters=None): + def all_versions(self, stix_id, version=None, _composite_filters=None): """Retrieve STIX object from local/remote TAXII Collection endpoint, all versions of it @@ -148,8 +162,6 @@ class TAXIICollectionSource(DataSource): stix_id (str): The STIX ID of the STIX objects to be retrieved. _composite_filters (set): set of filters passed from the parent CompositeDataSource, not user supplied - allow_custom (bool): whether to retrieve custom objects/properties - or not. Default: False. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. @@ -163,17 +175,17 @@ class TAXIICollectionSource(DataSource): Filter("match[version]", "=", "all") ] - all_data = self.query(query=query, allow_custom=allow_custom, _composite_filters=_composite_filters) + all_data = self.query(query=query, _composite_filters=_composite_filters) # parse STIX objects from TAXII returned json - all_data = [parse(stix_obj, allow_custom=allow_custom, version=version) for stix_obj in all_data] + all_data = [parse(stix_obj, allow_custom=self.allow_custom, version=version) for stix_obj in all_data] # check - was added to handle erroneous TAXII servers all_data_clean = [stix_obj for stix_obj in all_data if stix_obj.id == stix_id] return all_data_clean - def query(self, query=None, allow_custom=False, version=None, _composite_filters=None): + def query(self, query=None, version=None, _composite_filters=None): """Search and retreive STIX objects based on the complete query A "complete query" includes the filters from the query, the filters @@ -184,8 +196,6 @@ class TAXIICollectionSource(DataSource): query (list): list of filters to search on _composite_filters (set): set of filters passed from the CompositeDataSource, not user supplied - allow_custom (bool): whether to retrieve custom objects/properties - or not. Default: False. version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If None, use latest version. @@ -228,7 +238,7 @@ class TAXIICollectionSource(DataSource): all_data = [] # parse python STIX objects from the STIX object dicts - stix_objs = [parse(stix_obj_dict, allow_custom=allow_custom, version=version) for stix_obj_dict in all_data] + stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom, version=version) for stix_obj_dict in all_data] return stix_objs diff --git a/stix2/test/test_filesystem.py b/stix2/test/test_filesystem.py index 85f6966..729285a 100644 --- a/stix2/test/test_filesystem.py +++ b/stix2/test/test_filesystem.py @@ -340,7 +340,7 @@ def test_filesystem_object_with_custom_property(fs_store): fs_store.add(camp, True) - camp_r = fs_store.get(camp.id, allow_custom=True) + camp_r = fs_store.get(camp.id) assert camp_r.id == camp.id assert camp_r.x_empire == camp.x_empire @@ -352,9 +352,9 @@ def test_filesystem_object_with_custom_property_in_bundle(fs_store): allow_custom=True) bundle = Bundle(camp, allow_custom=True) - fs_store.add(bundle, allow_custom=True) + fs_store.add(bundle) - camp_r = fs_store.get(camp.id, allow_custom=True) + camp_r = fs_store.get(camp.id) assert camp_r.id == camp.id assert camp_r.x_empire == camp.x_empire @@ -367,9 +367,9 @@ def test_filesystem_custom_object(fs_store): pass newobj = NewObj(property1='something') - fs_store.add(newobj, allow_custom=True) + fs_store.add(newobj) - newobj_r = fs_store.get(newobj.id, allow_custom=True) + newobj_r = fs_store.get(newobj.id) assert newobj_r.id == newobj.id assert newobj_r.property1 == 'something' diff --git a/stix2/test/test_memory.py b/stix2/test/test_memory.py index 6b1219e..d5651d0 100644 --- a/stix2/test/test_memory.py +++ b/stix2/test/test_memory.py @@ -184,64 +184,64 @@ def test_memory_store_save_load_file(mem_store): shutil.rmtree(os.path.dirname(filename)) +# Currently deprecated - removed functionality from Memory API +# def test_memory_store_add_stix_object_str(mem_store): +# # add stix object string +# camp_id = "campaign--111111b6-1112-4fb0-111b-b111107ca70a" +# camp_name = "Aurelius" +# camp_alias = "Purple Robes" +# camp = """{ +# "name": "%s", +# "type": "campaign", +# "objective": "German and French Intelligence Services", +# "aliases": ["%s"], +# "id": "%s", +# "created": "2017-05-31T21:31:53.197755Z" +# }""" % (camp_name, camp_alias, camp_id) +# +# mem_store.add(camp) +# +# camp_r = mem_store.get(camp_id) +# assert camp_r["id"] == camp_id +# assert camp_r["name"] == camp_name +# assert camp_alias in camp_r["aliases"] -def test_memory_store_add_stix_object_str(mem_store): - # add stix object string - camp_id = "campaign--111111b6-1112-4fb0-111b-b111107ca70a" - camp_name = "Aurelius" - camp_alias = "Purple Robes" - camp = """{ - "name": "%s", - "type": "campaign", - "objective": "German and French Intelligence Services", - "aliases": ["%s"], - "id": "%s", - "created": "2017-05-31T21:31:53.197755Z" - }""" % (camp_name, camp_alias, camp_id) - - mem_store.add(camp) - - camp_r = mem_store.get(camp_id) - assert camp_r["id"] == camp_id - assert camp_r["name"] == camp_name - assert camp_alias in camp_r["aliases"] - - -def test_memory_store_add_stix_bundle_str(mem_store): - # add stix bundle string - camp_id = "campaign--133111b6-1112-4fb0-111b-b111107ca70a" - camp_name = "Atilla" - camp_alias = "Huns" - bund = """{ - "type": "bundle", - "id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a", - "spec_version": "2.0", - "objects": [ - { - "name": "%s", - "type": "campaign", - "objective": "Bulgarian, Albanian and Romanian Intelligence Services", - "aliases": ["%s"], - "id": "%s", - "created": "2017-05-31T21:31:53.197755Z" - } - ] - }""" % (camp_name, camp_alias, camp_id) - - mem_store.add(bund) - - camp_r = mem_store.get(camp_id) - assert camp_r["id"] == camp_id - assert camp_r["name"] == camp_name - assert camp_alias in camp_r["aliases"] +# Currently deprecated - removed functionality from Memory API +# def test_memory_store_add_stix_bundle_str(mem_store): +# # add stix bundle string +# camp_id = "campaign--133111b6-1112-4fb0-111b-b111107ca70a" +# camp_name = "Atilla" +# camp_alias = "Huns" +# bund = """{ +# "type": "bundle", +# "id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a", +# "spec_version": "2.0", +# "objects": [ +# { +# "name": "%s", +# "type": "campaign", +# "objective": "Bulgarian, Albanian and Romanian Intelligence Services", +# "aliases": ["%s"], +# "id": "%s", +# "created": "2017-05-31T21:31:53.197755Z" +# } +# ] +# }""" % (camp_name, camp_alias, camp_id) +# +# mem_store.add(bund) +# +# camp_r = mem_store.get(camp_id) +# assert camp_r["id"] == camp_id +# assert camp_r["name"] == camp_name +# assert camp_alias in camp_r["aliases"] def test_memory_store_add_invalid_object(mem_store): ind = ('indicator', IND1) # tuple isn't valid with pytest.raises(TypeError) as excinfo: mem_store.add(ind) - assert 'stix_data must be' in str(excinfo.value) - assert 'a STIX object' in str(excinfo.value) + assert 'stix_data expected to be' in str(excinfo.value) + assert 'a python-stix2 object' in str(excinfo.value) assert 'JSON formatted STIX' in str(excinfo.value) assert 'JSON formatted STIX bundle' in str(excinfo.value) diff --git a/stix2/test/test_versioning.py b/stix2/test/test_versioning.py index 8695a30..233587e 100644 --- a/stix2/test/test_versioning.py +++ b/stix2/test/test_versioning.py @@ -88,11 +88,15 @@ def test_versioning_error_bad_modified_value(): assert excinfo.value.cls == stix2.Campaign assert excinfo.value.prop_name == "modified" - assert excinfo.value.reason == "The new modified datetime cannot be before the current modified datatime." + assert excinfo.value.reason == "The new modified datetime cannot be before than or equal to the current modified datetime." \ + "It cannot be equal, as according to STIX 2 specification, objects that are different " \ + "but have the same id and modified timestamp do not have defined consumer behavior." msg = "Invalid value for {0} '{1}': {2}" msg = msg.format(stix2.Campaign.__name__, "modified", - "The new modified datetime cannot be before the current modified datatime.") + "The new modified datetime cannot be before than or equal to the current modified datetime." + "It cannot be equal, as according to STIX 2 specification, objects that are different " + "but have the same id and modified timestamp do not have defined consumer behavior.") assert str(excinfo.value) == msg @@ -153,7 +157,9 @@ def test_versioning_error_dict_bad_modified_value(): assert excinfo.value.cls == dict assert excinfo.value.prop_name == "modified" - assert excinfo.value.reason == "The new modified datetime cannot be before the current modified datatime." + assert excinfo.value.reason == "The new modified datetime cannot be before than or equal to the current modified datetime." \ + "It cannot be equal, as according to STIX 2 specification, objects that are different " \ + "but have the same id and modified timestamp do not have defined consumer behavior." def test_versioning_error_dict_no_modified_value(): @@ -206,3 +212,33 @@ def test_revoke_invalid_cls(): stix2.utils.revoke(campaign_v1) assert 'cannot revoke object of this type' in str(excinfo.value) + + +def test_remove_custom_stix_property(): + mal = stix2.Malware(name="ColePowers", + labels=["rootkit"], + x_custom="armada", + allow_custom=True) + + mal_nc = stix2.utils.remove_custom_stix(mal) + + assert "x_custom" not in mal_nc + assert stix2.utils.parse_into_datetime(mal["modified"], precision="millisecond") < stix2.utils.parse_into_datetime(mal_nc["modified"], + precision="millisecond") + + +def test_remove_custom_stix_object(): + @stix2.CustomObject("x-animal", [ + ("species", stix2.properties.StringProperty(required=True)), + ("animal_class", stix2.properties.StringProperty()), + ]) + class Animal(object): + def __init__(self, animal_class=None, **kwargs): + if animal_class and animal_class not in ["mammal", "bird"]: + raise ValueError("Not a recognized class of animal") + + animal = Animal(species="lion", animal_class="mammal") + + nc = stix2.utils.remove_custom_stix(animal) + + assert nc is None diff --git a/stix2/utils.py b/stix2/utils.py index 974fd94..10de8b0 100644 --- a/stix2/utils.py +++ b/stix2/utils.py @@ -232,9 +232,9 @@ def new_version(data, **kwargs): new_modified_property = parse_into_datetime(kwargs['modified'], precision='millisecond') if new_modified_property <= old_modified_property: raise InvalidValueError(cls, 'modified', - "The new modified datetime cannot be before the current modified datetime. The new modified time can " - "also not be equal to the current modified datetime because if a consumer receives two objects that are " - "different, but have the same id and modified timestamp, it is not defined how the consumer handles the objects.") + "The new modified datetime cannot be before than or equal to the current modified datetime." + "It cannot be equal, as according to STIX 2 specification, objects that are different " + "but have the same id and modified timestamp do not have defined consumer behavior.") new_obj_inner.update(kwargs) # Exclude properties with a value of 'None' in case data is not an instance of a _STIXBase subclass return cls(**{k: v for k, v in new_obj_inner.items() if v is not None}) @@ -269,9 +269,12 @@ def remove_custom_stix(stix_obj): Args: stix_obj (dict OR python-stix obj): a single python-stix object or dict of a STIX object + + Returns: + A new version of the object with any custom content removed """ - if stix_obj["type"].startswith("x_"): + if stix_obj["type"].startswith("x-"): # if entire object is custom, discard return None