Merge pull request #385 from emmanvg/taxii-datastore-updates

Test Datastore TAXII Updates
master
Chris Lenk 2020-04-03 17:30:04 -04:00 committed by GitHub
commit 8c4204de74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 111 additions and 16 deletions

View File

@ -12,7 +12,7 @@ from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.utils import deduplicate
try:
from taxii2client import ValidationError
from taxii2client.exceptions import ValidationError
_taxii2_client = True
except ImportError:
_taxii2_client = False

View File

@ -114,6 +114,42 @@ def stix_objs1():
return [ind1, ind2, ind3, ind4, ind5]
@pytest.fixture
def stix_objs1_manifests():
# Tests against latest medallion (TAXII 2.1)
ind1 = {
"date_added": "2017-01-27T13:49:53.935Z",
"id": "indicator--00000000-0000-4000-8000-000000000001",
"media_type": "application/stix+json;version=2.1",
"version": "2017-01-27T13:49:53.935Z",
}
ind2 = {
"date_added": "2017-01-27T13:49:53.935Z",
"id": "indicator--00000000-0000-4000-8000-000000000001",
"media_type": "application/stix+json;version=2.1",
"version": "2017-01-27T13:49:53.935Z",
}
ind3 = {
"date_added": "2017-01-27T13:49:53.935Z",
"id": "indicator--00000000-0000-4000-8000-000000000001",
"media_type": "application/stix+json;version=2.1",
"version": "2017-01-27T13:49:53.936Z",
}
ind4 = {
"date_added": "2017-01-27T13:49:53.935Z",
"id": "indicator--00000000-0000-4000-8000-000000000002",
"media_type": "application/stix+json;version=2.1",
"version": "2017-01-27T13:49:53.935Z",
}
ind5 = {
"date_added": "2017-01-27T13:49:53.935Z",
"id": "indicator--00000000-0000-4000-8000-000000000002",
"media_type": "application/stix+json;version=2.1",
"version": "2017-01-27T13:49:53.935Z",
}
return [ind1, ind2, ind3, ind4, ind5]
@pytest.fixture
def stix_objs2():
ind6 = {

View File

@ -4,12 +4,13 @@ from medallion.filters.basic_filter import BasicFilter
import pytest
from requests.models import Response
import six
from taxii2client import Collection
from taxii2client.common import _filter_kwargs_to_query_params
from taxii2client.v20 import Collection
import stix2
from stix2.datastore import DataSourceError
from stix2.datastore.filters import Filter
from stix2.utils import get_timestamp
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -22,6 +23,7 @@ class MockTAXIICollectionEndpoint(Collection):
url, collection_info=collection_info,
)
self.objects = []
self.manifests = []
def add_objects(self, bundle):
self._verify_can_write()
@ -29,6 +31,14 @@ class MockTAXIICollectionEndpoint(Collection):
bundle = json.loads(bundle, encoding='utf-8')
for object in bundle.get("objects", []):
self.objects.append(object)
self.manifests.append(
{
"date_added": get_timestamp(),
"id": object["id"],
"media_type": "application/stix+json;version=2.1",
"version": object.get("modified", object.get("created", get_timestamp())),
},
)
def get_objects(self, **filter_kwargs):
self._verify_can_read()
@ -38,8 +48,8 @@ class MockTAXIICollectionEndpoint(Collection):
objs = full_filter.process_filter(
self.objects,
("id", "type", "version"),
[],
None,
self.manifests,
100,
)[0]
if objs:
return stix2.v20.Bundle(objects=objs)
@ -60,8 +70,8 @@ class MockTAXIICollectionEndpoint(Collection):
filtered_objects = full_filter.process_filter(
objects,
("version",),
[],
None,
self.manifests,
100,
)[0]
else:
filtered_objects = []
@ -74,7 +84,7 @@ class MockTAXIICollectionEndpoint(Collection):
@pytest.fixture
def collection(stix_objs1):
def collection(stix_objs1, stix_objs1_manifests):
mock = MockTAXIICollectionEndpoint(
COLLECTION_URL, {
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
@ -89,11 +99,12 @@ def collection(stix_objs1):
)
mock.objects.extend(stix_objs1)
mock.manifests.extend(stix_objs1_manifests)
return mock
@pytest.fixture
def collection_no_rw_access(stix_objs1):
def collection_no_rw_access(stix_objs1, stix_objs1_manifests):
mock = MockTAXIICollectionEndpoint(
COLLECTION_URL, {
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
@ -108,6 +119,7 @@ def collection_no_rw_access(stix_objs1):
)
mock.objects.extend(stix_objs1)
mock.manifests.extend(stix_objs1_manifests)
return mock

View File

@ -135,6 +135,42 @@ def stix_objs1():
return [ind1, ind2, ind3, ind4, ind5]
@pytest.fixture
def stix_objs1_manifests():
# Tests against latest medallion (TAXII 2.1)
ind1 = {
"date_added": "2017-01-27T13:49:53.935Z",
"id": "indicator--00000000-0000-4000-8000-000000000001",
"media_type": "application/stix+json;version=2.1",
"version": "2017-01-27T13:49:53.935Z",
}
ind2 = {
"date_added": "2017-01-27T13:49:53.935Z",
"id": "indicator--00000000-0000-4000-8000-000000000001",
"media_type": "application/stix+json;version=2.1",
"version": "2017-01-27T13:49:53.935Z",
}
ind3 = {
"date_added": "2017-01-27T13:49:53.935Z",
"id": "indicator--00000000-0000-4000-8000-000000000001",
"media_type": "application/stix+json;version=2.1",
"version": "2017-01-27T13:49:53.936Z",
}
ind4 = {
"date_added": "2017-01-27T13:49:53.935Z",
"id": "indicator--00000000-0000-4000-8000-000000000002",
"media_type": "application/stix+json;version=2.1",
"version": "2017-01-27T13:49:53.935Z",
}
ind5 = {
"date_added": "2017-01-27T13:49:53.935Z",
"id": "indicator--00000000-0000-4000-8000-000000000002",
"media_type": "application/stix+json;version=2.1",
"version": "2017-01-27T13:49:53.935Z",
}
return [ind1, ind2, ind3, ind4, ind5]
@pytest.fixture
def stix_objs2():
ind6 = {

View File

@ -4,12 +4,13 @@ from medallion.filters.basic_filter import BasicFilter
import pytest
from requests.models import Response
import six
from taxii2client import Collection
from taxii2client.common import _filter_kwargs_to_query_params
from taxii2client.v21 import Collection
import stix2
from stix2.datastore import DataSourceError
from stix2.datastore.filters import Filter
from stix2.utils import get_timestamp
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -22,6 +23,7 @@ class MockTAXIICollectionEndpoint(Collection):
url, collection_info=collection_info,
)
self.objects = []
self.manifests = []
def add_objects(self, bundle):
self._verify_can_write()
@ -29,6 +31,14 @@ class MockTAXIICollectionEndpoint(Collection):
bundle = json.loads(bundle, encoding='utf-8')
for object in bundle.get("objects", []):
self.objects.append(object)
self.manifests.append(
{
"date_added": get_timestamp(),
"id": object["id"],
"media_type": "application/stix+json;version=2.1",
"version": object.get("modified", object.get("created", get_timestamp())),
},
)
def get_objects(self, **filter_kwargs):
self._verify_can_read()
@ -38,11 +48,10 @@ class MockTAXIICollectionEndpoint(Collection):
objs = full_filter.process_filter(
self.objects,
("id", "type", "version"),
[],
None,
self.manifests,
100,
)[0]
if objs:
print(objs)
return stix2.v21.Bundle(objects=objs)
else:
resp = Response()
@ -61,8 +70,8 @@ class MockTAXIICollectionEndpoint(Collection):
filtered_objects = full_filter.process_filter(
objects,
("version",),
[],
None,
self.manifests,
100,
)[0]
else:
filtered_objects = []
@ -75,7 +84,7 @@ class MockTAXIICollectionEndpoint(Collection):
@pytest.fixture
def collection(stix_objs1):
def collection(stix_objs1, stix_objs1_manifests):
mock = MockTAXIICollectionEndpoint(
COLLECTION_URL, {
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
@ -90,11 +99,12 @@ def collection(stix_objs1):
)
mock.objects.extend(stix_objs1)
mock.manifests.extend(stix_objs1_manifests)
return mock
@pytest.fixture
def collection_no_rw_access(stix_objs1):
def collection_no_rw_access(stix_objs1, stix_objs1_manifests):
mock = MockTAXIICollectionEndpoint(
COLLECTION_URL, {
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
@ -109,6 +119,7 @@ def collection_no_rw_access(stix_objs1):
)
mock.objects.extend(stix_objs1)
mock.manifests.extend(stix_objs1_manifests)
return mock