Fix memory datastore to pass tests

1) 'stix_data' is now optional; you can set up a MemorySink without
needing a starting set of data.
2) Removed stix2-validator calls. The validator fails when called on our
_STIXBase-derived classes because we store timestamps as datetime
objects, while the validator expects strings. Also, this was the only
datastore that used the validator, and we should be consistent across
all of them. The validator can be added back in later.
stix2.1
Chris Lenk 2017-09-08 12:49:08 -04:00
parent be07c32500
commit 190b639126
1 changed files with 9 additions and 25 deletions

View File

@ -22,33 +22,22 @@ import collections
import json import json
import os import os
from stix2validator import validate_instance
from stix2 import Bundle from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter from stix2.sources.filters import Filter
def _add(store, stix_data): def _add(store, stix_data=None):
"""Adds stix objects to MemoryStore/Source/Sink.""" """Adds stix objects to MemoryStore/Source/Sink."""
if isinstance(stix_data, collections.Mapping): if isinstance(stix_data, collections.Mapping):
# stix objects are in a bundle # stix objects are in a bundle
# verify STIX json data
r = validate_instance(stix_data)
# make dictionary of the objects for easy lookup # make dictionary of the objects for easy lookup
if r.is_valid: for stix_obj in stix_data["objects"]:
for stix_obj in stix_data["objects"]: store.data[stix_obj["id"]] = stix_obj
store.data[stix_obj["id"]] = stix_obj
else:
raise ValueError("Error: data passed was found to not be valid by the STIX 2 Validator: \n%s", r.as_dict())
elif isinstance(stix_data, list): elif isinstance(stix_data, list):
# stix objects are in a list # stix objects are in a list
for stix_obj in stix_data: for stix_obj in stix_data:
r = validate_instance(stix_obj) store.data[stix_obj["id"]] = stix_obj
if r.is_valid:
store.data[stix_obj["id"]] = stix_obj
else:
raise ValueError("Error: STIX object %s is not valid under STIX 2 validator.\n%s", stix_obj["id"], r)
else: else:
raise ValueError("stix_data must be in bundle format or raw list") raise ValueError("stix_data must be in bundle format or raw list")
@ -56,7 +45,7 @@ def _add(store, stix_data):
class MemoryStore(DataStore): class MemoryStore(DataStore):
""" """
""" """
def __init__(self, stix_data): def __init__(self, stix_data=None):
""" """
Notes: Notes:
It doesn't make sense to create a MemoryStore by passing It doesn't make sense to create a MemoryStore by passing
@ -83,7 +72,7 @@ class MemoryStore(DataStore):
class MemorySink(DataSink): class MemorySink(DataSink):
""" """
""" """
def __init__(self, stix_data, _store=False): def __init__(self, stix_data=None, _store=False):
""" """
Args: Args:
stix_data (dictionary OR list): valid STIX 2.0 content in stix_data (dictionary OR list): valid STIX 2.0 content in
@ -114,7 +103,7 @@ class MemorySink(DataSink):
class MemorySource(DataSource): class MemorySource(DataSource):
def __init__(self, stix_data, _store=False): def __init__(self, stix_data=None, _store=False):
""" """
Args: Args:
stix_data (dictionary OR list): valid STIX 2.0 content in stix_data (dictionary OR list): valid STIX 2.0 content in
@ -193,10 +182,5 @@ class MemorySource(DataSource):
file_path = os.path.abspath(file_path) file_path = os.path.abspath(file_path)
stix_data = json.load(open(file_path, "r")) stix_data = json.load(open(file_path, "r"))
r = validate_instance(stix_data) for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
raise ValueError("Error: STIX data loaded from file (%s) was found to not be validated by STIX 2 Validator.\n%s", file_path, r)