commit
740fd63734
|
@ -43,6 +43,7 @@ htmlcov/
|
|||
nosetests.xml
|
||||
coverage.xml
|
||||
*,cover
|
||||
.pytest_cache/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
|
|
|
@ -3,6 +3,7 @@ not_skip = __init__.py
|
|||
skip = workbench.py
|
||||
known_third_party =
|
||||
dateutil,
|
||||
medallion,
|
||||
ordereddict,
|
||||
pytest,
|
||||
pytz,
|
||||
|
|
|
@ -356,16 +356,16 @@
|
|||
"TAXII2 filter fields:\n",
|
||||
"\n",
|
||||
"* added_after\n",
|
||||
"* match[id]\n",
|
||||
"* match[type]\n",
|
||||
"* match[version]\n",
|
||||
"* id\n",
|
||||
"* type\n",
|
||||
"* version\n",
|
||||
"\n",
|
||||
"Supported operators:\n",
|
||||
"\n",
|
||||
"* =\n",
|
||||
"* !=\n",
|
||||
"* in\n",
|
||||
"* >\n",
|
||||
"* ```>```\n",
|
||||
"* < \n",
|
||||
"* ```>=```\n",
|
||||
"* <=\n",
|
||||
|
|
|
@ -1,54 +1,39 @@
|
|||
import json
|
||||
from taxii2client import Collection
|
||||
|
||||
from stix2.datastore.taxii import TAXIIDataSource
|
||||
import stix2
|
||||
|
||||
# Flask TAXII server - developmental
|
||||
ROOT = 'http://localhost:5000'
|
||||
AUTH = {'user': 'mk', 'pass': 'Pass'}
|
||||
# This example is based on the medallion server with default_data.json
|
||||
# See https://github.com/oasis-open/cti-taxii-server for more information
|
||||
|
||||
|
||||
def main():
|
||||
collection = Collection("http://127.0.0.1:5000/trustgroup1/collections/52892447-4d7e-4f70-b94d-d7f22742ff63/",
|
||||
user="admin", password="Password0")
|
||||
|
||||
# instantiate TAXII data source
|
||||
taxii = TAXIIDataSource(api_root=ROOT, auth=AUTH)
|
||||
taxii = stix2.TAXIICollectionSource(collection)
|
||||
|
||||
# get (file watch indicator)
|
||||
indicator_fw = taxii.get(id_="indicator--a932fcc6-e032-176c-126f-cb970a5a1ade")
|
||||
# get (url watch indicator)
|
||||
indicator_fw = taxii.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
print("\n\n-------Queried for Indicator - got:")
|
||||
print(json.dumps(indicator_fw, indent=4))
|
||||
print(indicator_fw.serialize(indent=4))
|
||||
|
||||
# all versions (file watch indicator - currently only 1. maybe Emmanuelle can add a version)
|
||||
indicator_fw_versions = taxii.get(id_="indicator--a932fcc6-e032-176c-126f-cb970a5a1ade")
|
||||
# all versions (url watch indicator - currently two)
|
||||
indicator_fw_versions = taxii.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
print("\n\n------Queried for indicator (all_versions()) - got:")
|
||||
print(json.dumps(indicator_fw_versions, indent=4))
|
||||
for indicator in indicator_fw_versions:
|
||||
print(indicator.serialize(indent=4))
|
||||
|
||||
# add TAXII filter (ie filter should be passed to TAXII)
|
||||
taxii_filter_ids, status = taxii.add_filter(
|
||||
[
|
||||
{
|
||||
"field": "type",
|
||||
"op": "in",
|
||||
"value": "malware"
|
||||
}
|
||||
])
|
||||
query_filter = stix2.Filter("type", "in", "malware")
|
||||
|
||||
print("\n\n-------Added filter:")
|
||||
print("Filter ID: {0}".format(taxii_filter_ids[0]))
|
||||
print("Filter status: \n")
|
||||
print(json.dumps(status, indent=4))
|
||||
print("filters: \n")
|
||||
print(json.dumps(taxii.get_filters(), indent=4))
|
||||
|
||||
# get() - but with filter attached
|
||||
malware = taxii.query()
|
||||
# query() - but with filter attached. There are no malware objects in this collection
|
||||
malwares = taxii.query(query=query_filter)
|
||||
print("\n\n\n--------Queried for Malware string (with above filter attached) - got:")
|
||||
print(json.dumps(malware, indent=4))
|
||||
|
||||
# remove TAXII filter
|
||||
taxii.remove_filter(taxii_filter_ids)
|
||||
print("\n\n-------Removed filter(TAXII filter):")
|
||||
print("filters: \n")
|
||||
print(json.dumps(taxii.get_filters(), indent=4))
|
||||
for malware in malwares:
|
||||
print(malware.serialize(indent=4))
|
||||
if not malwares:
|
||||
print(malwares)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -268,10 +268,7 @@ class MemorySource(DataSource):
|
|||
is returned in the same form as it as added.
|
||||
|
||||
"""
|
||||
if query is None:
|
||||
query = FilterSet()
|
||||
else:
|
||||
query = FilterSet(query)
|
||||
query = FilterSet(query)
|
||||
|
||||
# combine all query filters
|
||||
if self.filters:
|
||||
|
|
|
@ -176,8 +176,8 @@ class TAXIICollectionSource(DataSource):
|
|||
"""
|
||||
# make query in TAXII query format since 'id' is TAXII field
|
||||
query = [
|
||||
Filter("match[id]", "=", stix_id),
|
||||
Filter("match[version]", "=", "all")
|
||||
Filter("id", "=", stix_id),
|
||||
Filter("version", "=", "all")
|
||||
]
|
||||
|
||||
all_data = self.query(query=query, _composite_filters=_composite_filters)
|
||||
|
@ -232,7 +232,8 @@ class TAXIICollectionSource(DataSource):
|
|||
all_data = deduplicate(all_data)
|
||||
|
||||
# apply local (CompositeDataSource, TAXIICollectionSource and query) filters
|
||||
all_data = list(apply_common_filters(all_data, (query - taxii_filters)))
|
||||
query.remove(taxii_filters)
|
||||
all_data = list(apply_common_filters(all_data, query))
|
||||
|
||||
except HTTPError:
|
||||
# if resources not found or access is denied from TAXII server, return empty list
|
||||
|
@ -249,7 +250,7 @@ class TAXIICollectionSource(DataSource):
|
|||
Does not put in TAXII spec format as the TAXII2Client (that we use)
|
||||
does this for us.
|
||||
|
||||
NOTE:
|
||||
Notes:
|
||||
Currently, the TAXII2Client can handle TAXII filters where the
|
||||
filter value is list, as both a comma-seperated string or python list
|
||||
|
||||
|
@ -265,7 +266,8 @@ class TAXIICollectionSource(DataSource):
|
|||
query (list): list of filters to extract which ones are TAXII
|
||||
specific.
|
||||
|
||||
Returns: a list of the TAXII filters
|
||||
Returns:
|
||||
A list of TAXII filters that meet the TAXII filtering parameters.
|
||||
|
||||
"""
|
||||
taxii_filters = []
|
||||
|
|
|
@ -46,3 +46,114 @@ def malware(uuid4, clock):
|
|||
@pytest.fixture
|
||||
def relationship(uuid4, clock):
|
||||
return stix2.Relationship(**RELATIONSHIP_KWARGS)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def stix_objs1():
|
||||
ind1 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
ind2 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
ind3 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.936Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
ind4 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
ind5 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
return [ind1, ind2, ind3, ind4, ind5]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def stix_objs2():
|
||||
ind6 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-31T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
ind7 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
ind8 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
return [ind6, ind7, ind8]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def real_stix_objs2(stix_objs2):
|
||||
return [stix2.parse(x) for x in stix_objs2]
|
||||
|
|
|
@ -1,765 +1,117 @@
|
|||
import pytest
|
||||
from taxii2client import Collection
|
||||
|
||||
from stix2 import Filter, MemorySink, MemorySource
|
||||
from stix2.core import parse
|
||||
from stix2.datastore import (CompositeDataSource, DataSink, DataSource,
|
||||
make_id, taxii)
|
||||
from stix2.datastore.filters import apply_common_filters
|
||||
from stix2.utils import STIXdatetime, deduplicate, parse_into_datetime
|
||||
|
||||
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
|
||||
DataStoreMixin)
|
||||
from stix2.datastore.filters import Filter
|
||||
from stix2.test.constants import CAMPAIGN_MORE_KWARGS
|
||||
|
||||
|
||||
class MockTAXIIClient(object):
|
||||
"""Mock for taxii2_client.TAXIIClient"""
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def collection():
|
||||
return Collection(COLLECTION_URL, MockTAXIIClient())
|
||||
|
||||
|
||||
stix_objs = [
|
||||
{
|
||||
"created": "2017-01-27T13:49:53.997Z",
|
||||
"description": "\n\nTITLE:\n\tPoison Ivy",
|
||||
"id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
|
||||
"labels": [
|
||||
"remote-access-trojan"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.997Z",
|
||||
"name": "Poison Ivy",
|
||||
"type": "malware"
|
||||
},
|
||||
{
|
||||
"created": "2014-05-08T09:00:00.000Z",
|
||||
"id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
|
||||
"labels": [
|
||||
"file-hash-watchlist"
|
||||
],
|
||||
"modified": "2014-05-08T09:00:00.000Z",
|
||||
"name": "File hash for Poison Ivy variant",
|
||||
"pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2014-05-08T09:00:00.000000Z"
|
||||
},
|
||||
{
|
||||
"created": "2014-05-08T09:00:00.000Z",
|
||||
"granular_markings": [
|
||||
{
|
||||
"marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
|
||||
"selectors": [
|
||||
"relationship_type"
|
||||
]
|
||||
}
|
||||
],
|
||||
"id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463",
|
||||
"modified": "2014-05-08T09:00:00.000Z",
|
||||
"object_marking_refs": [
|
||||
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
|
||||
],
|
||||
"relationship_type": "indicates",
|
||||
"revoked": True,
|
||||
"source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
|
||||
"target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
|
||||
"type": "relationship"
|
||||
},
|
||||
{
|
||||
"id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef",
|
||||
"created": "2016-02-14T00:00:00.000Z",
|
||||
"created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9",
|
||||
"modified": "2016-02-14T00:00:00.000Z",
|
||||
"type": "vulnerability",
|
||||
"name": "CVE-2014-0160",
|
||||
"description": "The (1) TLS...",
|
||||
"external_references": [
|
||||
{
|
||||
"source_name": "cve",
|
||||
"external_id": "CVE-2014-0160"
|
||||
}
|
||||
],
|
||||
"labels": ["heartbleed", "has-logo"]
|
||||
},
|
||||
{
|
||||
"type": "observed-data",
|
||||
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||
"created": "2016-04-06T19:58:16.000Z",
|
||||
"modified": "2016-04-06T19:58:16.000Z",
|
||||
"first_observed": "2015-12-21T19:00:00Z",
|
||||
"last_observed": "2015-12-21T19:00:00Z",
|
||||
"number_observed": 1,
|
||||
"objects": {
|
||||
"0": {
|
||||
"type": "file",
|
||||
"name": "HAL 9000.exe"
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
]
|
||||
|
||||
# same as above objects but converted to real Python STIX2 objects
|
||||
# to test filters against true Python STIX2 objects
|
||||
real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs]
|
||||
|
||||
filters = [
|
||||
Filter("type", "!=", "relationship"),
|
||||
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
|
||||
Filter("labels", "in", "remote-access-trojan"),
|
||||
Filter("created", ">", "2015-01-01T01:00:00.000Z"),
|
||||
Filter("revoked", "=", True),
|
||||
Filter("revoked", "!=", True),
|
||||
Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
|
||||
Filter("granular_markings.selectors", "in", "relationship_type"),
|
||||
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
|
||||
Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"),
|
||||
Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"),
|
||||
Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"),
|
||||
Filter("granular_markings.selectors", "in", "description"),
|
||||
Filter("external_references.source_name", "=", "CVE"),
|
||||
Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}})
|
||||
]
|
||||
|
||||
IND1 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
IND2 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
IND3 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.936Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
IND4 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
IND5 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
IND6 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-31T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
IND7 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
IND8 = {
|
||||
"created": "2017-01-27T13:49:53.935Z",
|
||||
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||
"labels": [
|
||||
"url-watchlist"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.935Z",
|
||||
"name": "Malicious site hosting downloader",
|
||||
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||
}
|
||||
|
||||
STIX_OBJS2 = [IND6, IND7, IND8]
|
||||
STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
|
||||
|
||||
REAL_STIX_OBJS2 = [parse(IND6), parse(IND7), parse(IND8)]
|
||||
REAL_STIX_OBJS1 = [parse(IND1), parse(IND2), parse(IND3), parse(IND4), parse(IND5)]
|
||||
|
||||
|
||||
def test_ds_abstract_class_smoke():
|
||||
def test_datasource_abstract_class_raises_error():
|
||||
with pytest.raises(TypeError):
|
||||
DataSource()
|
||||
|
||||
|
||||
def test_datasink_abstract_class_raises_error():
|
||||
with pytest.raises(TypeError):
|
||||
DataSink()
|
||||
|
||||
|
||||
def test_ds_taxii(collection):
|
||||
ds = taxii.TAXIICollectionSource(collection)
|
||||
assert ds.collection is not None
|
||||
def test_datastore_smoke():
|
||||
assert DataStoreMixin() is not None
|
||||
|
||||
|
||||
def test_ds_taxii_name(collection):
|
||||
ds = taxii.TAXIICollectionSource(collection)
|
||||
assert ds.collection is not None
|
||||
def test_datastore_get_raises():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
DataStoreMixin().get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
assert "DataStoreMixin has no data source to query" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_parse_taxii_filters():
|
||||
query = [
|
||||
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
|
||||
Filter("id", "=", "taxii stix object ID"),
|
||||
Filter("type", "=", "taxii stix object ID"),
|
||||
Filter("version", "=", "first"),
|
||||
Filter("created_by_ref", "=", "Bane"),
|
||||
]
|
||||
|
||||
taxii_filters_expected = [
|
||||
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
|
||||
Filter("id", "=", "taxii stix object ID"),
|
||||
Filter("type", "=", "taxii stix object ID"),
|
||||
Filter("version", "=", "first")
|
||||
]
|
||||
|
||||
ds = taxii.TAXIICollectionSource(collection)
|
||||
|
||||
taxii_filters = ds._parse_taxii_filters(query)
|
||||
|
||||
assert taxii_filters == taxii_filters_expected
|
||||
def test_datastore_all_versions_raises():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
DataStoreMixin().all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
assert "DataStoreMixin has no data source to query" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_add_get_remove_filter():
|
||||
ds = taxii.TAXIICollectionSource(collection)
|
||||
|
||||
# First 3 filters are valid, remaining properties are erroneous in some way
|
||||
valid_filters = [
|
||||
Filter('type', '=', 'malware'),
|
||||
Filter('id', '!=', 'stix object id'),
|
||||
Filter('labels', 'in', ["heartbleed", "malicious-activity"]),
|
||||
]
|
||||
|
||||
assert len(ds.filters) == 0
|
||||
|
||||
ds.filters.add(valid_filters[0])
|
||||
assert len(ds.filters) == 1
|
||||
|
||||
# Addin the same filter again will have no effect since `filters` acts like a set
|
||||
ds.filters.add(valid_filters[0])
|
||||
assert len(ds.filters) == 1
|
||||
|
||||
ds.filters.add(valid_filters[1])
|
||||
assert len(ds.filters) == 2
|
||||
ds.filters.add(valid_filters[2])
|
||||
assert len(ds.filters) == 3
|
||||
|
||||
assert valid_filters == [f for f in ds.filters]
|
||||
|
||||
# remove
|
||||
ds.filters.remove(valid_filters[0])
|
||||
|
||||
assert len(ds.filters) == 2
|
||||
|
||||
ds.filters.add(valid_filters)
|
||||
def test_datastore_query_raises():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
DataStoreMixin().query([Filter("type", "=", "indicator")])
|
||||
assert "DataStoreMixin has no data source to query" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_filter_ops_check():
|
||||
# invalid filters - non supported operators
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
# create Filter that has an operator that is not allowed
|
||||
Filter('modified', '*', 'not supported operator')
|
||||
assert str(excinfo.value) == "Filter operator '*' not supported for specified property: 'modified'"
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
Filter("type", "%", "4")
|
||||
assert "Filter operator '%' not supported for specified property" in str(excinfo.value)
|
||||
def test_datastore_creator_of_raises():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
DataStoreMixin().creator_of(CAMPAIGN_MORE_KWARGS)
|
||||
assert "DataStoreMixin has no data source to query" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_filter_value_type_check():
|
||||
# invalid filters - non supported value types
|
||||
def test_datastore_relationships_raises():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
DataStoreMixin().relationships(obj="indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
target_only=True)
|
||||
assert "DataStoreMixin has no data source to query" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_datastore_related_to_raises():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
DataStoreMixin().related_to(obj="indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
target_only=True)
|
||||
assert "DataStoreMixin has no data source to query" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_datastore_add_raises():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
DataStoreMixin().add(CAMPAIGN_MORE_KWARGS)
|
||||
assert "DataStoreMixin has no data sink to put objects in" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_composite_datastore_get_raises_error():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
CompositeDataSource().get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
assert "CompositeDataSource has no data sources" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_composite_datastore_all_versions_raises_error():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
CompositeDataSource().all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
assert "CompositeDataSource has no data sources" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_composite_datastore_query_raises_error():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
CompositeDataSource().query([Filter("type", "=", "indicator")])
|
||||
assert "CompositeDataSource has no data sources" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_composite_datastore_relationships_raises_error():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
CompositeDataSource().relationships(obj="indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
target_only=True)
|
||||
assert "CompositeDataSource has no data sources" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_composite_datastore_related_to_raises_error():
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
CompositeDataSource().related_to(obj="indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||
target_only=True)
|
||||
assert "CompositeDataSource has no data sources" == str(excinfo.value)
|
||||
|
||||
|
||||
def test_composite_datastore_add_data_source_raises_error():
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
Filter('created', '=', object())
|
||||
# On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`.
|
||||
assert any([s in str(excinfo.value) for s in ["<type 'object'>", "'<class 'object'>'"]])
|
||||
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
|
||||
ind = "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
|
||||
CompositeDataSource().add_data_source(ind)
|
||||
assert "DataSource (to be added) is not of type stix2.DataSource. DataSource type is '{}'".format(type(ind)) == str(excinfo.value)
|
||||
|
||||
|
||||
def test_composite_datastore_add_data_sources_raises_error():
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
Filter("type", "=", complex(2, -1))
|
||||
assert any([s in str(excinfo.value) for s in ["<type 'complex'>", "'<class 'complex'>'"]])
|
||||
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
|
||||
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
Filter("type", "=", set([16, 23]))
|
||||
assert any([s in str(excinfo.value) for s in ["<type 'set'>", "'<class 'set'>'"]])
|
||||
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_filter_type_underscore_check():
|
||||
# check that Filters where property="type", value (name) doesnt have underscores
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
Filter("type", "=", "oh_underscore")
|
||||
assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_apply_common_filters0():
|
||||
# "Return any object whose type is not relationship"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[0]]))
|
||||
ids = [r['id'] for r in resp]
|
||||
assert stix_objs[0]['id'] in ids
|
||||
assert stix_objs[1]['id'] in ids
|
||||
assert stix_objs[3]['id'] in ids
|
||||
assert len(ids) == 4
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[0]]))
|
||||
ids = [r.id for r in resp]
|
||||
assert real_stix_objs[0].id in ids
|
||||
assert real_stix_objs[1].id in ids
|
||||
assert real_stix_objs[3].id in ids
|
||||
assert len(ids) == 4
|
||||
|
||||
|
||||
def test_apply_common_filters1():
|
||||
# "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[1]]))
|
||||
assert resp[0]['id'] == stix_objs[2]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[1]]))
|
||||
assert resp[0].id == real_stix_objs[2].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters2():
|
||||
# "Return any object that contains remote-access-trojan in labels"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[2]]))
|
||||
assert resp[0]['id'] == stix_objs[0]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[2]]))
|
||||
assert resp[0].id == real_stix_objs[0].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters3():
|
||||
# "Return any object created after 2015-01-01T01:00:00.000Z"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[3]]))
|
||||
assert resp[0]['id'] == stix_objs[0]['id']
|
||||
assert len(resp) == 3
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[3]]))
|
||||
assert resp[0].id == real_stix_objs[0].id
|
||||
assert len(resp) == 3
|
||||
|
||||
|
||||
def test_apply_common_filters4():
|
||||
# "Return any revoked object"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[4]]))
|
||||
assert resp[0]['id'] == stix_objs[2]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[4]]))
|
||||
assert resp[0].id == real_stix_objs[2].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters5():
|
||||
# "Return any object whose not revoked"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[5]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[5]]))
|
||||
assert len(resp) == 4
|
||||
|
||||
|
||||
def test_apply_common_filters6():
|
||||
# "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[6]]))
|
||||
assert resp[0]['id'] == stix_objs[2]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[6]]))
|
||||
assert resp[0].id == real_stix_objs[2].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters7():
|
||||
# "Return any object that contains relationship_type in their selectors AND
|
||||
# also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]]))
|
||||
assert resp[0]['id'] == stix_objs[2]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]]))
|
||||
assert resp[0].id == real_stix_objs[2].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters8():
|
||||
# "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[9]]))
|
||||
assert resp[0]['id'] == stix_objs[3]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[9]]))
|
||||
assert resp[0].id == real_stix_objs[3].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters9():
|
||||
# "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[10]]))
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[10]]))
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters10():
|
||||
# "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None)
|
||||
resp = list(apply_common_filters(stix_objs, [filters[11]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[11]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
|
||||
def test_apply_common_filters11():
|
||||
# "Return any object that contains description in its selectors" (None)
|
||||
resp = list(apply_common_filters(stix_objs, [filters[12]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[12]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
|
||||
def test_apply_common_filters12():
|
||||
# "Return any object that matches CVE in source_name" (None, case sensitive)
|
||||
resp = list(apply_common_filters(stix_objs, [filters[13]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[13]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
|
||||
def test_apply_common_filters13():
|
||||
# Return any object that matches file object in "objects"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[14]]))
|
||||
assert resp[0]["id"] == stix_objs[4]["id"]
|
||||
assert len(resp) == 1
|
||||
# important additional check to make sure original File dict was
|
||||
# not converted to File object. (this was a deep bug found)
|
||||
assert isinstance(resp[0]["objects"]["0"], dict)
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[14]]))
|
||||
assert resp[0].id == real_stix_objs[4].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_datetime_filter_behavior():
|
||||
"""if a filter is initialized with its value being a datetime object
|
||||
OR the STIX object property being filtered on is a datetime object, all
|
||||
resulting comparisons executed are done on the string representations
|
||||
of the datetime objects, as the Filter functionality will convert
|
||||
all datetime objects to there string forms using format_datetim()
|
||||
|
||||
This test makes sure all datetime comparisons are carried out correctly
|
||||
"""
|
||||
filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond"))
|
||||
filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z")
|
||||
|
||||
# check that filter value is converted from datetime to str
|
||||
assert isinstance(filter_with_dt_obj.value, str)
|
||||
|
||||
# compare datetime string to filter w/ datetime obj
|
||||
resp = list(apply_common_filters(stix_objs, [filter_with_dt_obj]))
|
||||
assert len(resp) == 1
|
||||
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
|
||||
|
||||
# compare datetime obj to filter w/ datetime obj
|
||||
resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj]))
|
||||
assert len(resp) == 1
|
||||
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
|
||||
assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered
|
||||
|
||||
# compare datetime string to filter w/ str
|
||||
resp = list(apply_common_filters(stix_objs, [filter_with_str]))
|
||||
assert len(resp) == 1
|
||||
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
|
||||
|
||||
# compare datetime obj to filter w/ str
|
||||
resp = list(apply_common_filters(real_stix_objs, [filter_with_str]))
|
||||
assert len(resp) == 1
|
||||
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
|
||||
assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered
|
||||
|
||||
|
||||
def test_filters0():
|
||||
# "Return any object modified before 2017-01-28T13:49:53.935Z"
|
||||
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]))
|
||||
assert resp[0]['id'] == STIX_OBJS2[1]['id']
|
||||
assert len(resp) == 2
|
||||
|
||||
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
|
||||
assert resp[0].id == REAL_STIX_OBJS2[1].id
|
||||
assert len(resp) == 2
|
||||
|
||||
|
||||
def test_filters1():
|
||||
# "Return any object modified after 2017-01-28T13:49:53.935Z"
|
||||
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")]))
|
||||
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
|
||||
assert resp[0].id == REAL_STIX_OBJS2[0].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_filters2():
|
||||
# "Return any object modified after or on 2017-01-28T13:49:53.935Z"
|
||||
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")]))
|
||||
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
||||
assert len(resp) == 3
|
||||
|
||||
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))]))
|
||||
assert resp[0].id == REAL_STIX_OBJS2[0].id
|
||||
assert len(resp) == 3
|
||||
|
||||
|
||||
def test_filters3():
|
||||
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
|
||||
resp = list(apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")]))
|
||||
assert resp[0]['id'] == STIX_OBJS2[1]['id']
|
||||
assert len(resp) == 2
|
||||
|
||||
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
|
||||
fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z"))
|
||||
resp = list(apply_common_filters(REAL_STIX_OBJS2, [fv]))
|
||||
assert resp[0].id == REAL_STIX_OBJS2[1].id
|
||||
assert len(resp) == 2
|
||||
|
||||
|
||||
def test_filters4():
|
||||
# Assert invalid Filter cannot be created
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
Filter("modified", "?", "2017-01-27T13:49:53.935Z")
|
||||
assert str(excinfo.value) == ("Filter operator '?' not supported "
|
||||
"for specified property: 'modified'")
|
||||
|
||||
|
||||
def test_filters5():
|
||||
# "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
|
||||
resp = list(apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]))
|
||||
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]))
|
||||
assert resp[0].id == REAL_STIX_OBJS2[0].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_filters6():
|
||||
# Test filtering on non-common property
|
||||
resp = list(apply_common_filters(STIX_OBJS2, [Filter("name", "=", "Malicious site hosting downloader")]))
|
||||
assert resp[0]['id'] == STIX_OBJS2[0]['id']
|
||||
assert len(resp) == 3
|
||||
|
||||
resp = list(apply_common_filters(REAL_STIX_OBJS2, [Filter("name", "=", "Malicious site hosting downloader")]))
|
||||
assert resp[0].id == REAL_STIX_OBJS2[0].id
|
||||
assert len(resp) == 3
|
||||
|
||||
|
||||
def test_filters7():
|
||||
# Test filtering on embedded property
|
||||
obsvd_data_obj = {
|
||||
"type": "observed-data",
|
||||
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||
"created": "2016-04-06T19:58:16.000Z",
|
||||
"modified": "2016-04-06T19:58:16.000Z",
|
||||
"first_observed": "2015-12-21T19:00:00Z",
|
||||
"last_observed": "2015-12-21T19:00:00Z",
|
||||
"number_observed": 50,
|
||||
"objects": {
|
||||
"0": {
|
||||
"type": "file",
|
||||
"hashes": {
|
||||
"SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f"
|
||||
},
|
||||
"extensions": {
|
||||
"pdf-ext": {
|
||||
"version": "1.7",
|
||||
"document_info_dict": {
|
||||
"Title": "Sample document",
|
||||
"Author": "Adobe Systems Incorporated",
|
||||
"Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh",
|
||||
"Producer": "Acrobat Distiller 3.01 for Power Macintosh",
|
||||
"CreationDate": "20070412090123-02"
|
||||
},
|
||||
"pdfid0": "DFCE52BD827ECF765649852119D",
|
||||
"pdfid1": "57A1E0F9ED2AE523E313C"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stix_objects = list(STIX_OBJS2) + [obsvd_data_obj]
|
||||
real_stix_objects = list(REAL_STIX_OBJS2) + [parse(obsvd_data_obj)]
|
||||
|
||||
resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
|
||||
assert resp[0]['id'] == stix_objects[3]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
|
||||
assert resp[0].id == real_stix_objects[3].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_deduplicate():
|
||||
unique = deduplicate(STIX_OBJS1)
|
||||
|
||||
# Only 3 objects are unique
|
||||
# 2 id's vary
|
||||
# 2 modified times vary for a particular id
|
||||
|
||||
assert len(unique) == 3
|
||||
|
||||
ids = [obj['id'] for obj in unique]
|
||||
mods = [obj['modified'] for obj in unique]
|
||||
|
||||
assert "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" in ids
|
||||
assert "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" in ids
|
||||
assert "2017-01-27T13:49:53.935Z" in mods
|
||||
assert "2017-01-27T13:49:53.936Z" in mods
|
||||
|
||||
|
||||
def test_add_remove_composite_datasource():
|
||||
cds = CompositeDataSource()
|
||||
ds1 = MemorySource()
|
||||
ds2 = MemorySource()
|
||||
ds3 = MemorySink()
|
||||
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
cds.add_data_sources([ds1, ds2, ds1, ds3])
|
||||
assert str(excinfo.value) == ("DataSource (to be added) is not of type "
|
||||
"stix2.DataSource. DataSource type is '<class 'stix2.datastore.memory.MemorySink'>'")
|
||||
|
||||
cds.add_data_sources([ds1, ds2, ds1])
|
||||
|
||||
assert len(cds.get_all_data_sources()) == 2
|
||||
|
||||
cds.remove_data_sources([ds1.id, ds2.id])
|
||||
|
||||
assert len(cds.get_all_data_sources()) == 0
|
||||
|
||||
|
||||
def test_composite_datasource_operations():
|
||||
BUNDLE1 = dict(id="bundle--%s" % make_id(),
|
||||
objects=STIX_OBJS1,
|
||||
spec_version="2.0",
|
||||
type="bundle")
|
||||
cds1 = CompositeDataSource()
|
||||
ds1_1 = MemorySource(stix_data=BUNDLE1)
|
||||
ds1_2 = MemorySource(stix_data=STIX_OBJS2)
|
||||
|
||||
cds2 = CompositeDataSource()
|
||||
ds2_1 = MemorySource(stix_data=BUNDLE1)
|
||||
ds2_2 = MemorySource(stix_data=STIX_OBJS2)
|
||||
|
||||
cds1.add_data_sources([ds1_1, ds1_2])
|
||||
cds2.add_data_sources([ds2_1, ds2_2])
|
||||
|
||||
indicators = cds1.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
|
||||
# In STIX_OBJS2 changed the 'modified' property to a later time...
|
||||
assert len(indicators) == 2
|
||||
|
||||
cds1.add_data_sources([cds2])
|
||||
|
||||
indicator = cds1.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
|
||||
assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
|
||||
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
|
||||
assert indicator["type"] == "indicator"
|
||||
|
||||
query1 = [
|
||||
Filter("type", "=", "indicator")
|
||||
]
|
||||
|
||||
query2 = [
|
||||
Filter("valid_from", "=", "2017-01-27T13:49:53.935382Z")
|
||||
]
|
||||
|
||||
cds1.filters.add(query2)
|
||||
|
||||
results = cds1.query(query1)
|
||||
|
||||
# STIX_OBJS2 has indicator with later time, one with different id, one with
|
||||
# original time in STIX_OBJS1
|
||||
assert len(results) == 3
|
||||
|
||||
indicator = cds1.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
|
||||
assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
|
||||
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
|
||||
assert indicator["type"] == "indicator"
|
||||
|
||||
# There is only one indicator with different ID. Since we use the same data
|
||||
# when deduplicated, only two indicators (one with different modified).
|
||||
results = cds1.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
assert len(results) == 2
|
||||
|
||||
# Since we have filters already associated with our CompositeSource providing
|
||||
# nothing returns the same as cds1.query(query1) (the associated query is query2)
|
||||
results = cds1.query([])
|
||||
assert len(results) == 3
|
||||
ind = "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
|
||||
CompositeDataSource().add_data_sources(ind)
|
||||
assert "DataSource (to be added) is not of type stix2.DataSource. DataSource type is '{}'".format(type(ind)) == str(excinfo.value)
|
||||
|
||||
|
||||
def test_composite_datastore_no_datasource():
|
||||
cds = CompositeDataSource()
|
||||
|
||||
with pytest.raises(AttributeError) as excinfo:
|
||||
cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
assert 'CompositeDataSource has no data source' in str(excinfo.value)
|
||||
|
|
|
@ -7,10 +7,10 @@ import pytest
|
|||
from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
|
||||
FileSystemSource, FileSystemStore, Filter, Identity,
|
||||
Indicator, Malware, Relationship, properties)
|
||||
|
||||
from .constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
|
||||
IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS,
|
||||
MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS)
|
||||
from stix2.test.constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
|
||||
IDENTITY_KWARGS, INDICATOR_ID,
|
||||
INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS,
|
||||
RELATIONSHIP_IDS)
|
||||
|
||||
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
|
||||
|
|
@ -0,0 +1,463 @@
|
|||
import pytest
|
||||
|
||||
from stix2 import parse
|
||||
from stix2.datastore.filters import Filter, apply_common_filters
|
||||
from stix2.utils import STIXdatetime, parse_into_datetime
|
||||
|
||||
stix_objs = [
|
||||
{
|
||||
"created": "2017-01-27T13:49:53.997Z",
|
||||
"description": "\n\nTITLE:\n\tPoison Ivy",
|
||||
"id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
|
||||
"labels": [
|
||||
"remote-access-trojan"
|
||||
],
|
||||
"modified": "2017-01-27T13:49:53.997Z",
|
||||
"name": "Poison Ivy",
|
||||
"type": "malware"
|
||||
},
|
||||
{
|
||||
"created": "2014-05-08T09:00:00.000Z",
|
||||
"id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
|
||||
"labels": [
|
||||
"file-hash-watchlist"
|
||||
],
|
||||
"modified": "2014-05-08T09:00:00.000Z",
|
||||
"name": "File hash for Poison Ivy variant",
|
||||
"pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
|
||||
"type": "indicator",
|
||||
"valid_from": "2014-05-08T09:00:00.000000Z"
|
||||
},
|
||||
{
|
||||
"created": "2014-05-08T09:00:00.000Z",
|
||||
"granular_markings": [
|
||||
{
|
||||
"marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
|
||||
"selectors": [
|
||||
"relationship_type"
|
||||
]
|
||||
}
|
||||
],
|
||||
"id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463",
|
||||
"modified": "2014-05-08T09:00:00.000Z",
|
||||
"object_marking_refs": [
|
||||
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
|
||||
],
|
||||
"relationship_type": "indicates",
|
||||
"revoked": True,
|
||||
"source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
|
||||
"target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
|
||||
"type": "relationship"
|
||||
},
|
||||
{
|
||||
"id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef",
|
||||
"created": "2016-02-14T00:00:00.000Z",
|
||||
"created_by_ref": "identity--00000000-0000-0000-0000-b8e91df99dc9",
|
||||
"modified": "2016-02-14T00:00:00.000Z",
|
||||
"type": "vulnerability",
|
||||
"name": "CVE-2014-0160",
|
||||
"description": "The (1) TLS...",
|
||||
"external_references": [
|
||||
{
|
||||
"source_name": "cve",
|
||||
"external_id": "CVE-2014-0160"
|
||||
}
|
||||
],
|
||||
"labels": ["heartbleed", "has-logo"]
|
||||
},
|
||||
{
|
||||
"type": "observed-data",
|
||||
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||
"created": "2016-04-06T19:58:16.000Z",
|
||||
"modified": "2016-04-06T19:58:16.000Z",
|
||||
"first_observed": "2015-12-21T19:00:00Z",
|
||||
"last_observed": "2015-12-21T19:00:00Z",
|
||||
"number_observed": 1,
|
||||
"objects": {
|
||||
"0": {
|
||||
"type": "file",
|
||||
"name": "HAL 9000.exe"
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
filters = [
|
||||
Filter("type", "!=", "relationship"),
|
||||
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
|
||||
Filter("labels", "in", "remote-access-trojan"),
|
||||
Filter("created", ">", "2015-01-01T01:00:00.000Z"),
|
||||
Filter("revoked", "=", True),
|
||||
Filter("revoked", "!=", True),
|
||||
Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
|
||||
Filter("granular_markings.selectors", "in", "relationship_type"),
|
||||
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
|
||||
Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"),
|
||||
Filter("created_by_ref", "=", "identity--00000000-0000-0000-0000-b8e91df99dc9"),
|
||||
Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9"),
|
||||
Filter("granular_markings.selectors", "in", "description"),
|
||||
Filter("external_references.source_name", "=", "CVE"),
|
||||
Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}})
|
||||
]
|
||||
|
||||
# same as above objects but converted to real Python STIX2 objects
|
||||
# to test filters against true Python STIX2 objects
|
||||
real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs]
|
||||
|
||||
|
||||
def test_filter_ops_check():
|
||||
# invalid filters - non supported operators
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
# create Filter that has an operator that is not allowed
|
||||
Filter('modified', '*', 'not supported operator')
|
||||
assert str(excinfo.value) == "Filter operator '*' not supported for specified property: 'modified'"
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
Filter("type", "%", "4")
|
||||
assert "Filter operator '%' not supported for specified property" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_filter_value_type_check():
|
||||
# invalid filters - non supported value types
|
||||
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
Filter('created', '=', object())
|
||||
# On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`.
|
||||
assert any([s in str(excinfo.value) for s in ["<type 'object'>", "'<class 'object'>'"]])
|
||||
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
|
||||
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
Filter("type", "=", complex(2, -1))
|
||||
assert any([s in str(excinfo.value) for s in ["<type 'complex'>", "'<class 'complex'>'"]])
|
||||
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
|
||||
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
Filter("type", "=", set([16, 23]))
|
||||
assert any([s in str(excinfo.value) for s in ["<type 'set'>", "'<class 'set'>'"]])
|
||||
assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_filter_type_underscore_check():
|
||||
# check that Filters where property="type", value (name) doesnt have underscores
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
Filter("type", "=", "oh_underscore")
|
||||
assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_apply_common_filters0():
|
||||
# "Return any object whose type is not relationship"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[0]]))
|
||||
ids = [r['id'] for r in resp]
|
||||
assert stix_objs[0]['id'] in ids
|
||||
assert stix_objs[1]['id'] in ids
|
||||
assert stix_objs[3]['id'] in ids
|
||||
assert len(ids) == 4
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[0]]))
|
||||
ids = [r.id for r in resp]
|
||||
assert real_stix_objs[0].id in ids
|
||||
assert real_stix_objs[1].id in ids
|
||||
assert real_stix_objs[3].id in ids
|
||||
assert len(ids) == 4
|
||||
|
||||
|
||||
def test_apply_common_filters1():
|
||||
# "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[1]]))
|
||||
assert resp[0]['id'] == stix_objs[2]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[1]]))
|
||||
assert resp[0].id == real_stix_objs[2].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters2():
|
||||
# "Return any object that contains remote-access-trojan in labels"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[2]]))
|
||||
assert resp[0]['id'] == stix_objs[0]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[2]]))
|
||||
assert resp[0].id == real_stix_objs[0].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters3():
|
||||
# "Return any object created after 2015-01-01T01:00:00.000Z"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[3]]))
|
||||
assert resp[0]['id'] == stix_objs[0]['id']
|
||||
assert len(resp) == 3
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[3]]))
|
||||
assert resp[0].id == real_stix_objs[0].id
|
||||
assert len(resp) == 3
|
||||
|
||||
|
||||
def test_apply_common_filters4():
|
||||
# "Return any revoked object"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[4]]))
|
||||
assert resp[0]['id'] == stix_objs[2]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[4]]))
|
||||
assert resp[0].id == real_stix_objs[2].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters5():
|
||||
# "Return any object whose not revoked"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[5]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[5]]))
|
||||
assert len(resp) == 4
|
||||
|
||||
|
||||
def test_apply_common_filters6():
|
||||
# "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[6]]))
|
||||
assert resp[0]['id'] == stix_objs[2]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[6]]))
|
||||
assert resp[0].id == real_stix_objs[2].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters7():
|
||||
# "Return any object that contains relationship_type in their selectors AND
|
||||
# also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]]))
|
||||
assert resp[0]['id'] == stix_objs[2]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]]))
|
||||
assert resp[0].id == real_stix_objs[2].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters8():
|
||||
# "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[9]]))
|
||||
assert resp[0]['id'] == stix_objs[3]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[9]]))
|
||||
assert resp[0].id == real_stix_objs[3].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters9():
|
||||
# "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[10]]))
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[10]]))
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_apply_common_filters10():
|
||||
# "Return any object that matches marking-definition--613f2e26-0000-0000-0000-b8e91df99dc9 in object_marking_refs" (None)
|
||||
resp = list(apply_common_filters(stix_objs, [filters[11]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[11]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
|
||||
def test_apply_common_filters11():
|
||||
# "Return any object that contains description in its selectors" (None)
|
||||
resp = list(apply_common_filters(stix_objs, [filters[12]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[12]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
|
||||
def test_apply_common_filters12():
|
||||
# "Return any object that matches CVE in source_name" (None, case sensitive)
|
||||
resp = list(apply_common_filters(stix_objs, [filters[13]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[13]]))
|
||||
assert len(resp) == 0
|
||||
|
||||
|
||||
def test_apply_common_filters13():
|
||||
# Return any object that matches file object in "objects"
|
||||
resp = list(apply_common_filters(stix_objs, [filters[14]]))
|
||||
assert resp[0]["id"] == stix_objs[4]["id"]
|
||||
assert len(resp) == 1
|
||||
# important additional check to make sure original File dict was
|
||||
# not converted to File object. (this was a deep bug found)
|
||||
assert isinstance(resp[0]["objects"]["0"], dict)
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs, [filters[14]]))
|
||||
assert resp[0].id == real_stix_objs[4].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_datetime_filter_behavior():
|
||||
"""if a filter is initialized with its value being a datetime object
|
||||
OR the STIX object property being filtered on is a datetime object, all
|
||||
resulting comparisons executed are done on the string representations
|
||||
of the datetime objects, as the Filter functionality will convert
|
||||
all datetime objects to there string forms using format_datetim()
|
||||
|
||||
This test makes sure all datetime comparisons are carried out correctly
|
||||
"""
|
||||
filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond"))
|
||||
filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z")
|
||||
|
||||
# check that filter value is converted from datetime to str
|
||||
assert isinstance(filter_with_dt_obj.value, str)
|
||||
|
||||
# compare datetime string to filter w/ datetime obj
|
||||
resp = list(apply_common_filters(stix_objs, [filter_with_dt_obj]))
|
||||
assert len(resp) == 1
|
||||
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
|
||||
|
||||
# compare datetime obj to filter w/ datetime obj
|
||||
resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj]))
|
||||
assert len(resp) == 1
|
||||
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
|
||||
assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered
|
||||
|
||||
# compare datetime string to filter w/ str
|
||||
resp = list(apply_common_filters(stix_objs, [filter_with_str]))
|
||||
assert len(resp) == 1
|
||||
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
|
||||
|
||||
# compare datetime obj to filter w/ str
|
||||
resp = list(apply_common_filters(real_stix_objs, [filter_with_str]))
|
||||
assert len(resp) == 1
|
||||
assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
|
||||
assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered
|
||||
|
||||
|
||||
def test_filters0(stix_objs2, real_stix_objs2):
|
||||
# "Return any object modified before 2017-01-28T13:49:53.935Z"
|
||||
resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]))
|
||||
assert resp[0]['id'] == stix_objs2[1]['id']
|
||||
assert len(resp) == 2
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
|
||||
assert resp[0].id == real_stix_objs2[1].id
|
||||
assert len(resp) == 2
|
||||
|
||||
|
||||
def test_filters1(stix_objs2, real_stix_objs2):
|
||||
# "Return any object modified after 2017-01-28T13:49:53.935Z"
|
||||
resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")]))
|
||||
assert resp[0]['id'] == stix_objs2[0]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
|
||||
assert resp[0].id == real_stix_objs2[0].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_filters2(stix_objs2, real_stix_objs2):
|
||||
# "Return any object modified after or on 2017-01-28T13:49:53.935Z"
|
||||
resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")]))
|
||||
assert resp[0]['id'] == stix_objs2[0]['id']
|
||||
assert len(resp) == 3
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))]))
|
||||
assert resp[0].id == real_stix_objs2[0].id
|
||||
assert len(resp) == 3
|
||||
|
||||
|
||||
def test_filters3(stix_objs2, real_stix_objs2):
|
||||
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
|
||||
resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")]))
|
||||
assert resp[0]['id'] == stix_objs2[1]['id']
|
||||
assert len(resp) == 2
|
||||
|
||||
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
|
||||
fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z"))
|
||||
resp = list(apply_common_filters(real_stix_objs2, [fv]))
|
||||
assert resp[0].id == real_stix_objs2[1].id
|
||||
assert len(resp) == 2
|
||||
|
||||
|
||||
def test_filters4():
|
||||
# Assert invalid Filter cannot be created
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
Filter("modified", "?", "2017-01-27T13:49:53.935Z")
|
||||
assert str(excinfo.value) == ("Filter operator '?' not supported "
|
||||
"for specified property: 'modified'")
|
||||
|
||||
|
||||
def test_filters5(stix_objs2, real_stix_objs2):
|
||||
# "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
|
||||
resp = list(apply_common_filters(stix_objs2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]))
|
||||
assert resp[0]['id'] == stix_objs2[0]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]))
|
||||
assert resp[0].id == real_stix_objs2[0].id
|
||||
assert len(resp) == 1
|
||||
|
||||
|
||||
def test_filters6(stix_objs2, real_stix_objs2):
|
||||
# Test filtering on non-common property
|
||||
resp = list(apply_common_filters(stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")]))
|
||||
assert resp[0]['id'] == stix_objs2[0]['id']
|
||||
assert len(resp) == 3
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")]))
|
||||
assert resp[0].id == real_stix_objs2[0].id
|
||||
assert len(resp) == 3
|
||||
|
||||
|
||||
def test_filters7(stix_objs2, real_stix_objs2):
|
||||
# Test filtering on embedded property
|
||||
obsvd_data_obj = {
|
||||
"type": "observed-data",
|
||||
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
|
||||
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
|
||||
"created": "2016-04-06T19:58:16.000Z",
|
||||
"modified": "2016-04-06T19:58:16.000Z",
|
||||
"first_observed": "2015-12-21T19:00:00Z",
|
||||
"last_observed": "2015-12-21T19:00:00Z",
|
||||
"number_observed": 50,
|
||||
"objects": {
|
||||
"0": {
|
||||
"type": "file",
|
||||
"hashes": {
|
||||
"SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f"
|
||||
},
|
||||
"extensions": {
|
||||
"pdf-ext": {
|
||||
"version": "1.7",
|
||||
"document_info_dict": {
|
||||
"Title": "Sample document",
|
||||
"Author": "Adobe Systems Incorporated",
|
||||
"Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh",
|
||||
"Producer": "Acrobat Distiller 3.01 for Power Macintosh",
|
||||
"CreationDate": "20070412090123-02"
|
||||
},
|
||||
"pdfid0": "DFCE52BD827ECF765649852119D",
|
||||
"pdfid1": "57A1E0F9ED2AE523E313C"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stix_objects = list(stix_objs2) + [obsvd_data_obj]
|
||||
real_stix_objects = list(real_stix_objs2) + [parse(obsvd_data_obj)]
|
||||
|
||||
resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
|
||||
assert resp[0]['id'] == stix_objects[3]['id']
|
||||
assert len(resp) == 1
|
||||
|
||||
resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
|
||||
assert resp[0].id == real_stix_objects[3].id
|
||||
assert len(resp) == 1
|
|
@ -0,0 +1,87 @@
|
|||
import pytest
|
||||
|
||||
from stix2.datastore import CompositeDataSource, make_id
|
||||
from stix2.datastore.filters import Filter
|
||||
from stix2.datastore.memory import MemorySink, MemorySource
|
||||
|
||||
|
||||
def test_add_remove_composite_datasource():
|
||||
cds = CompositeDataSource()
|
||||
ds1 = MemorySource()
|
||||
ds2 = MemorySource()
|
||||
ds3 = MemorySink()
|
||||
|
||||
with pytest.raises(TypeError) as excinfo:
|
||||
cds.add_data_sources([ds1, ds2, ds1, ds3])
|
||||
assert str(excinfo.value) == ("DataSource (to be added) is not of type "
|
||||
"stix2.DataSource. DataSource type is '<class 'stix2.datastore.memory.MemorySink'>'")
|
||||
|
||||
cds.add_data_sources([ds1, ds2, ds1])
|
||||
|
||||
assert len(cds.get_all_data_sources()) == 2
|
||||
|
||||
cds.remove_data_sources([ds1.id, ds2.id])
|
||||
|
||||
assert len(cds.get_all_data_sources()) == 0
|
||||
|
||||
|
||||
def test_composite_datasource_operations(stix_objs1, stix_objs2):
|
||||
BUNDLE1 = dict(id="bundle--%s" % make_id(),
|
||||
objects=stix_objs1,
|
||||
spec_version="2.0",
|
||||
type="bundle")
|
||||
cds1 = CompositeDataSource()
|
||||
ds1_1 = MemorySource(stix_data=BUNDLE1)
|
||||
ds1_2 = MemorySource(stix_data=stix_objs2)
|
||||
|
||||
cds2 = CompositeDataSource()
|
||||
ds2_1 = MemorySource(stix_data=BUNDLE1)
|
||||
ds2_2 = MemorySource(stix_data=stix_objs2)
|
||||
|
||||
cds1.add_data_sources([ds1_1, ds1_2])
|
||||
cds2.add_data_sources([ds2_1, ds2_2])
|
||||
|
||||
indicators = cds1.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
|
||||
# In STIX_OBJS2 changed the 'modified' property to a later time...
|
||||
assert len(indicators) == 2
|
||||
|
||||
cds1.add_data_sources([cds2])
|
||||
|
||||
indicator = cds1.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
|
||||
assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
|
||||
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
|
||||
assert indicator["type"] == "indicator"
|
||||
|
||||
query1 = [
|
||||
Filter("type", "=", "indicator")
|
||||
]
|
||||
|
||||
query2 = [
|
||||
Filter("valid_from", "=", "2017-01-27T13:49:53.935382Z")
|
||||
]
|
||||
|
||||
cds1.filters.add(query2)
|
||||
|
||||
results = cds1.query(query1)
|
||||
|
||||
# STIX_OBJS2 has indicator with later time, one with different id, one with
|
||||
# original time in STIX_OBJS1
|
||||
assert len(results) == 3
|
||||
|
||||
indicator = cds1.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
|
||||
assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
|
||||
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
|
||||
assert indicator["type"] == "indicator"
|
||||
|
||||
# There is only one indicator with different ID. Since we use the same data
|
||||
# when deduplicated, only two indicators (one with different modified).
|
||||
results = cds1.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
assert len(results) == 2
|
||||
|
||||
# Since we have filters already associated with our CompositeSource providing
|
||||
# nothing returns the same as cds1.query(query1) (the associated query is query2)
|
||||
results = cds1.query([])
|
||||
assert len(results) == 3
|
|
@ -0,0 +1,294 @@
|
|||
import json
|
||||
|
||||
from medallion.filters.basic_filter import BasicFilter
|
||||
import pytest
|
||||
from taxii2client import Collection, _filter_kwargs_to_query_params
|
||||
|
||||
from stix2 import (Bundle, TAXIICollectionSink, TAXIICollectionSource,
|
||||
TAXIICollectionStore, ThreatActor)
|
||||
from stix2.datastore.filters import Filter
|
||||
|
||||
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
|
||||
|
||||
|
||||
class MockTAXIICollectionEndpoint(Collection):
|
||||
"""Mock for taxii2_client.TAXIIClient"""
|
||||
|
||||
def __init__(self, url, **kwargs):
|
||||
super(MockTAXIICollectionEndpoint, self).__init__(url, **kwargs)
|
||||
self.objects = []
|
||||
|
||||
def add_objects(self, bundle):
|
||||
self._verify_can_write()
|
||||
if isinstance(bundle, str):
|
||||
bundle = json.loads(bundle)
|
||||
for object in bundle.get("objects", []):
|
||||
self.objects.append(object)
|
||||
|
||||
def get_objects(self, **filter_kwargs):
|
||||
self._verify_can_read()
|
||||
query_params = _filter_kwargs_to_query_params(filter_kwargs)
|
||||
if not isinstance(query_params, dict):
|
||||
query_params = json.loads(query_params)
|
||||
full_filter = BasicFilter(query_params or {})
|
||||
objs = full_filter.process_filter(
|
||||
self.objects,
|
||||
("id", "type", "version"),
|
||||
[]
|
||||
)
|
||||
return Bundle(objects=objs)
|
||||
|
||||
def get_object(self, id, version=None):
|
||||
self._verify_can_read()
|
||||
query_params = None
|
||||
if version:
|
||||
query_params = _filter_kwargs_to_query_params({"version": version})
|
||||
if query_params:
|
||||
query_params = json.loads(query_params)
|
||||
full_filter = BasicFilter(query_params or {})
|
||||
objs = full_filter.process_filter(
|
||||
self.objects,
|
||||
("version",),
|
||||
[]
|
||||
)
|
||||
return Bundle(objects=objs)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def collection(stix_objs1):
|
||||
mock = MockTAXIICollectionEndpoint(COLLECTION_URL, **{
|
||||
"id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
|
||||
"title": "Writable Collection",
|
||||
"description": "This collection is a dropbox for submitting indicators",
|
||||
"can_read": True,
|
||||
"can_write": True,
|
||||
"media_types": [
|
||||
"application/vnd.oasis.stix+json; version=2.0"
|
||||
]
|
||||
})
|
||||
|
||||
mock.objects.extend(stix_objs1)
|
||||
return mock
|
||||
|
||||
|
||||
def test_ds_taxii(collection):
|
||||
ds = TAXIICollectionSource(collection)
|
||||
assert ds.collection is not None
|
||||
|
||||
|
||||
def test_add_stix2_object(collection):
|
||||
tc_sink = TAXIICollectionSink(collection)
|
||||
|
||||
# create new STIX threat-actor
|
||||
ta = ThreatActor(name="Teddy Bear",
|
||||
labels=["nation-state"],
|
||||
sophistication="innovator",
|
||||
resource_level="government",
|
||||
goals=[
|
||||
"compromising environment NGOs",
|
||||
"water-hole attacks geared towards energy sector",
|
||||
])
|
||||
|
||||
tc_sink.add(ta)
|
||||
|
||||
|
||||
def test_add_stix2_with_custom_object(collection):
|
||||
tc_sink = TAXIICollectionStore(collection, allow_custom=True)
|
||||
|
||||
# create new STIX threat-actor
|
||||
ta = ThreatActor(name="Teddy Bear",
|
||||
labels=["nation-state"],
|
||||
sophistication="innovator",
|
||||
resource_level="government",
|
||||
goals=[
|
||||
"compromising environment NGOs",
|
||||
"water-hole attacks geared towards energy sector",
|
||||
],
|
||||
foo="bar",
|
||||
allow_custom=True)
|
||||
|
||||
tc_sink.add(ta)
|
||||
|
||||
|
||||
def test_add_list_object(collection, indicator):
|
||||
tc_sink = TAXIICollectionSink(collection)
|
||||
|
||||
# create new STIX threat-actor
|
||||
ta = ThreatActor(name="Teddy Bear",
|
||||
labels=["nation-state"],
|
||||
sophistication="innovator",
|
||||
resource_level="government",
|
||||
goals=[
|
||||
"compromising environment NGOs",
|
||||
"water-hole attacks geared towards energy sector",
|
||||
])
|
||||
|
||||
tc_sink.add([ta, indicator])
|
||||
|
||||
|
||||
def test_add_stix2_bundle_object(collection):
|
||||
tc_sink = TAXIICollectionSink(collection)
|
||||
|
||||
# create new STIX threat-actor
|
||||
ta = ThreatActor(name="Teddy Bear",
|
||||
labels=["nation-state"],
|
||||
sophistication="innovator",
|
||||
resource_level="government",
|
||||
goals=[
|
||||
"compromising environment NGOs",
|
||||
"water-hole attacks geared towards energy sector",
|
||||
])
|
||||
|
||||
tc_sink.add(Bundle(objects=[ta]))
|
||||
|
||||
|
||||
def test_add_str_object(collection):
|
||||
tc_sink = TAXIICollectionSink(collection)
|
||||
|
||||
# create new STIX threat-actor
|
||||
ta = """{
|
||||
"type": "threat-actor",
|
||||
"id": "threat-actor--eddff64f-feb1-4469-b07c-499a73c96415",
|
||||
"created": "2018-04-23T16:40:50.847Z",
|
||||
"modified": "2018-04-23T16:40:50.847Z",
|
||||
"name": "Teddy Bear",
|
||||
"goals": [
|
||||
"compromising environment NGOs",
|
||||
"water-hole attacks geared towards energy sector"
|
||||
],
|
||||
"sophistication": "innovator",
|
||||
"resource_level": "government",
|
||||
"labels": [
|
||||
"nation-state"
|
||||
]
|
||||
}"""
|
||||
|
||||
tc_sink.add(ta)
|
||||
|
||||
|
||||
def test_add_dict_object(collection):
|
||||
tc_sink = TAXIICollectionSink(collection)
|
||||
|
||||
ta = {
|
||||
"type": "threat-actor",
|
||||
"id": "threat-actor--eddff64f-feb1-4469-b07c-499a73c96415",
|
||||
"created": "2018-04-23T16:40:50.847Z",
|
||||
"modified": "2018-04-23T16:40:50.847Z",
|
||||
"name": "Teddy Bear",
|
||||
"goals": [
|
||||
"compromising environment NGOs",
|
||||
"water-hole attacks geared towards energy sector"
|
||||
],
|
||||
"sophistication": "innovator",
|
||||
"resource_level": "government",
|
||||
"labels": [
|
||||
"nation-state"
|
||||
]
|
||||
}
|
||||
|
||||
tc_sink.add(ta)
|
||||
|
||||
|
||||
def test_add_dict_bundle_object(collection):
|
||||
tc_sink = TAXIICollectionSink(collection)
|
||||
|
||||
ta = {
|
||||
"type": "bundle",
|
||||
"id": "bundle--860ccc8d-56c9-4fda-9384-84276fb52fb1",
|
||||
"spec_version": "2.0",
|
||||
"objects": [
|
||||
{
|
||||
"type": "threat-actor",
|
||||
"id": "threat-actor--dc5a2f41-f76e-425a-81fe-33afc7aabd75",
|
||||
"created": "2018-04-23T18:45:11.390Z",
|
||||
"modified": "2018-04-23T18:45:11.390Z",
|
||||
"name": "Teddy Bear",
|
||||
"goals": [
|
||||
"compromising environment NGOs",
|
||||
"water-hole attacks geared towards energy sector"
|
||||
],
|
||||
"sophistication": "innovator",
|
||||
"resource_level": "government",
|
||||
"labels": [
|
||||
"nation-state"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
tc_sink.add(ta)
|
||||
|
||||
|
||||
def test_get_stix2_object(collection):
|
||||
tc_sink = TAXIICollectionSource(collection)
|
||||
|
||||
objects = tc_sink.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||
|
||||
assert objects
|
||||
|
||||
|
||||
def test_parse_taxii_filters(collection):
|
||||
query = [
|
||||
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
|
||||
Filter("id", "=", "taxii stix object ID"),
|
||||
Filter("type", "=", "taxii stix object ID"),
|
||||
Filter("version", "=", "first"),
|
||||
Filter("created_by_ref", "=", "Bane"),
|
||||
]
|
||||
|
||||
taxii_filters_expected = [
|
||||
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
|
||||
Filter("id", "=", "taxii stix object ID"),
|
||||
Filter("type", "=", "taxii stix object ID"),
|
||||
Filter("version", "=", "first")
|
||||
]
|
||||
|
||||
ds = TAXIICollectionSource(collection)
|
||||
|
||||
taxii_filters = ds._parse_taxii_filters(query)
|
||||
|
||||
assert taxii_filters == taxii_filters_expected
|
||||
|
||||
|
||||
def test_add_get_remove_filter(collection):
|
||||
ds = TAXIICollectionSource(collection)
|
||||
|
||||
# First 3 filters are valid, remaining properties are erroneous in some way
|
||||
valid_filters = [
|
||||
Filter('type', '=', 'malware'),
|
||||
Filter('id', '!=', 'stix object id'),
|
||||
Filter('labels', 'in', ["heartbleed", "malicious-activity"]),
|
||||
]
|
||||
|
||||
assert len(ds.filters) == 0
|
||||
|
||||
ds.filters.add(valid_filters[0])
|
||||
assert len(ds.filters) == 1
|
||||
|
||||
# Addin the same filter again will have no effect since `filters` acts
|
||||
# like a set
|
||||
ds.filters.add(valid_filters[0])
|
||||
assert len(ds.filters) == 1
|
||||
|
||||
ds.filters.add(valid_filters[1])
|
||||
assert len(ds.filters) == 2
|
||||
|
||||
ds.filters.add(valid_filters[2])
|
||||
assert len(ds.filters) == 3
|
||||
|
||||
assert valid_filters == [f for f in ds.filters]
|
||||
|
||||
# remove
|
||||
ds.filters.remove(valid_filters[0])
|
||||
|
||||
assert len(ds.filters) == 2
|
||||
|
||||
ds.filters.add(valid_filters)
|
||||
|
||||
|
||||
def test_get_all_versions(collection):
|
||||
ds = TAXIICollectionStore(collection)
|
||||
|
||||
indicators = ds.all_versions('indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f')
|
||||
# There are 3 indicators but 2 share the same 'modified' timestamp
|
||||
assert len(indicators) == 2
|
|
@ -82,3 +82,21 @@ def test_get_dict_invalid(data):
|
|||
])
|
||||
def test_get_type_from_id(stix_id, typ):
|
||||
assert stix2.utils.get_type_from_id(stix_id) == typ
|
||||
|
||||
|
||||
def test_deduplicate(stix_objs1):
|
||||
unique = stix2.utils.deduplicate(stix_objs1)
|
||||
|
||||
# Only 3 objects are unique
|
||||
# 2 id's vary
|
||||
# 2 modified times vary for a particular id
|
||||
|
||||
assert len(unique) == 3
|
||||
|
||||
ids = [obj['id'] for obj in unique]
|
||||
mods = [obj['modified'] for obj in unique]
|
||||
|
||||
assert "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" in ids
|
||||
assert "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" in ids
|
||||
assert "2017-01-27T13:49:53.935Z" in mods
|
||||
assert "2017-01-27T13:49:53.936Z" in mods
|
||||
|
|
23
tox.ini
23
tox.ini
|
@ -3,15 +3,16 @@ envlist = py27,py34,py35,py36,style,isort-check
|
|||
|
||||
[testenv]
|
||||
deps =
|
||||
-U
|
||||
tox
|
||||
pytest
|
||||
pytest-cov
|
||||
coverage
|
||||
taxii2-client
|
||||
-U
|
||||
tox
|
||||
pytest
|
||||
pytest-cov
|
||||
coverage
|
||||
taxii2-client
|
||||
medallion
|
||||
commands =
|
||||
pytest --ignore=stix2/test/test_workbench.py --cov=stix2 stix2/test/ --cov-report term-missing
|
||||
pytest stix2/test/test_workbench.py --cov=stix2 --cov-report term-missing --cov-append
|
||||
pytest --ignore=stix2/test/test_workbench.py --cov=stix2 stix2/test/ --cov-report term-missing
|
||||
pytest stix2/test/test_workbench.py --cov=stix2 --cov-report term-missing --cov-append
|
||||
|
||||
passenv = CI TRAVIS TRAVIS_*
|
||||
|
||||
|
@ -22,13 +23,13 @@ commands =
|
|||
flake8
|
||||
|
||||
[flake8]
|
||||
max-line-length=160
|
||||
max-line-length = 160
|
||||
|
||||
[testenv:isort-check]
|
||||
deps = isort
|
||||
commands =
|
||||
isort -rc stix2 examples -df
|
||||
isort -rc stix2 examples -c
|
||||
isort -rc stix2 examples -df
|
||||
isort -rc stix2 examples -c
|
||||
|
||||
[travis]
|
||||
python =
|
||||
|
|
Loading…
Reference in New Issue