Separate out filesystem tests

stix2.0
Chris Lenk 2017-10-18 08:27:12 -04:00
parent 6deaddc04e
commit 84094e9f79
2 changed files with 256 additions and 221 deletions

View File

@ -1,18 +1,13 @@
import os
import shutil
import pytest
from taxii2client import Collection
from stix2 import (Bundle, Campaign, FileSystemSink, FileSystemSource,
FileSystemStore, Filter, MemorySource, MemoryStore)
from stix2 import Filter, MemorySource, MemoryStore
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, make_id, taxii)
from stix2.sources.filters import apply_common_filters
from stix2.utils import deduplicate
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
class MockTAXIIClient(object):
@ -30,15 +25,6 @@ def ds():
return DataSource()
@pytest.fixture
def fs_store():
# create
yield FileSystemStore(FS_PATH)
# remove campaign dir
shutil.rmtree(os.path.join(FS_PATH, "campaign"))
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
@ -522,209 +508,3 @@ def test_composite_datasource_operations():
# STIX_OBJS2 has indicator with later time, one with different id, one with
# original time in STIX_OBJS1
assert len(results) == 3
def test_filesytem_source():
# creation
fs_source = FileSystemSource(FS_PATH)
assert fs_source.stix_dir == FS_PATH
# get object
mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")
assert mal.id == "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"
assert mal.name == "Rover"
# all versions - (currently not a true all versions call as FileSystem cant have multiple versions)
id_ = fs_source.get("identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5")
assert id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
assert id_.name == "The MITRE Corporation"
assert id_.type == "identity"
# query
intrusion_sets = fs_source.query([Filter("type", '=', "intrusion-set")])
assert len(intrusion_sets) == 2
assert "intrusion-set--a653431d-6a5e-4600-8ad3-609b5af57064" in [is_.id for is_ in intrusion_sets]
assert "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a" in [is_.id for is_ in intrusion_sets]
is_1 = [is_ for is_ in intrusion_sets if is_.id == "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a"][0]
assert "DragonOK" in is_1.aliases
assert len(is_1.external_references) == 4
# query2
is_2 = fs_source.query([Filter("external_references.external_id", '=', "T1027")])
assert len(is_2) == 1
is_2 = is_2[0]
assert is_2.id == "attack-pattern--b3d682b6-98f2-4fb0-aa3b-b4df007ca70a"
assert is_2.type == "attack-pattern"
def test_filesystem_sink():
# creation
fs_sink = FileSystemSink(FS_PATH)
assert fs_sink.stix_dir == FS_PATH
fs_source = FileSystemSource(FS_PATH)
# Test all the ways stix objects can be added (via different supplied forms)
# add python stix object
camp1 = Campaign(name="Hannibal",
objective="Targeting Italian and Spanish Diplomat internet accounts",
aliases=["War Elephant"])
fs_sink.add(camp1)
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp1.id + ".json"))
camp1_r = fs_source.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == "Hannibal"
assert "War Elephant" in camp1_r.aliases
# add stix object dict
camp2 = {
"name": "Aurelius",
"type": "campaign",
"objective": "German and French Intelligence Services",
"aliases": ["Purple Robes"],
"id": "campaign--111111b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
fs_sink.add(camp2)
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp2["id"] + ".json"))
camp2_r = fs_source.get(camp2["id"])
assert camp2_r.id == camp2["id"]
assert camp2_r.name == camp2["name"]
assert "Purple Robes" in camp2_r.aliases
# add stix bundle dict
bund = {
"type": "bundle",
"id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a",
"spec_version": "2.0",
"objects": [
{
"name": "Atilla",
"type": "campaign",
"objective": "Bulgarian, Albanian and Romanian Intelligence Services",
"aliases": ["Huns"],
"id": "campaign--133111b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
]
}
fs_sink.add(bund)
assert os.path.exists(os.path.join(FS_PATH, "campaign", bund["objects"][0]["id"] + ".json"))
camp3_r = fs_source.get(bund["objects"][0]["id"])
assert camp3_r.id == bund["objects"][0]["id"]
assert camp3_r.name == bund["objects"][0]["name"]
assert "Huns" in camp3_r.aliases
# add json-encoded stix obj
camp4 = '{"type": "campaign", "id":"campaign--144111b6-1112-4fb0-111b-b111107ca70a",'\
' "created":"2017-05-31T21:31:53.197755Z", "name": "Ghengis Khan", "objective": "China and Russian infrastructure"}'
fs_sink.add(camp4)
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--144111b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp4_r = fs_source.get("campaign--144111b6-1112-4fb0-111b-b111107ca70a")
assert camp4_r.id == "campaign--144111b6-1112-4fb0-111b-b111107ca70a"
assert camp4_r.name == "Ghengis Khan"
# add json-encoded stix bundle
bund2 = '{"type": "bundle", "id": "bundle--332211b6-1132-4fb0-111b-b111107ca70a",' \
' "spec_version": "2.0", "objects": [{"type": "campaign", "id": "campaign--155155b6-1112-4fb0-111b-b111107ca70a",' \
' "created":"2017-05-31T21:31:53.197755Z", "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}'
fs_sink.add(bund2)
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--155155b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp5_r = fs_source.get("campaign--155155b6-1112-4fb0-111b-b111107ca70a")
assert camp5_r.id == "campaign--155155b6-1112-4fb0-111b-b111107ca70a"
assert camp5_r.name == "Spartacus"
# add list of objects
camp6 = Campaign(name="Comanche",
objective="US Midwest manufacturing firms, oil refineries, and businesses",
aliases=["Horse Warrior"])
camp7 = {
"name": "Napolean",
"type": "campaign",
"objective": "Central and Eastern Europe military commands and departments",
"aliases": ["The Frenchmen"],
"id": "campaign--122818b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
fs_sink.add([camp6, camp7])
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp6.id + ".json"))
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp6_r = fs_source.get(camp6.id)
assert camp6_r.id == camp6.id
assert "Horse Warrior" in camp6_r.aliases
camp7_r = fs_source.get(camp7["id"])
assert camp7_r.id == camp7["id"]
assert "The Frenchmen" in camp7_r.aliases
# remove all added objects
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp2_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp3_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp4_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp5_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp6_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp7_r.id + ".json"))
# remove campaign dir (that was added in course of testing)
os.rmdir(os.path.join(FS_PATH, "campaign"))
def test_filesystem_store(fs_store):
# get()
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
assert coa.type == "course-of-action"
# all versions() - (note at this time, all_versions() is still not applicable to FileSystem, as only one version is ever stored)
rel = fs_store.all_versions("relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1")[0]
assert rel.id == "relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1"
assert rel.type == "relationship"
# query()
tools = fs_store.query([Filter("labels", "in", "tool")])
assert len(tools) == 2
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
# add()
camp1 = Campaign(name="Great Heathen Army",
objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England",
aliases=["Ragnar"])
fs_store.add(camp1)
camp1_r = fs_store.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == camp1.name
# remove
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
# remove campaign dir
os.rmdir(os.path.join(FS_PATH, "campaign"))
def test_filesystem_add_object_with_custom_property_in_bundle(fs_store):
bundle = Bundle()
fs_store.add(bundle)

View File

@ -0,0 +1,255 @@
import os
import shutil
import pytest
from stix2 import (Bundle, Campaign, FileSystemSink, FileSystemSource,
FileSystemStore, Filter)
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
@pytest.fixture
def fs_store():
# create
yield FileSystemStore(FS_PATH)
# remove campaign dir
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
@pytest.fixture
def fs_source():
# create
fs = FileSystemSource(FS_PATH)
assert fs.stix_dir == FS_PATH
yield fs
# remove campaign dir
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
@pytest.fixture
def fs_sink():
# create
fs = FileSystemSink(FS_PATH)
assert fs.stix_dir == FS_PATH
yield fs
# remove campaign dir
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
def test_filesytem_source_get_object(fs_source):
# get object
mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")
assert mal.id == "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"
assert mal.name == "Rover"
def test_filesytem_source_all_versions(fs_source):
# all versions - (currently not a true all versions call as FileSystem cant have multiple versions)
id_ = fs_source.get("identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5")
assert id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
assert id_.name == "The MITRE Corporation"
assert id_.type == "identity"
def test_filesytem_source_query_single(fs_source):
# query2
is_2 = fs_source.query([Filter("external_references.external_id", '=', "T1027")])
assert len(is_2) == 1
is_2 = is_2[0]
assert is_2.id == "attack-pattern--b3d682b6-98f2-4fb0-aa3b-b4df007ca70a"
assert is_2.type == "attack-pattern"
def test_filesytem_source_query_multiple(fs_source):
# query
intrusion_sets = fs_source.query([Filter("type", '=', "intrusion-set")])
assert len(intrusion_sets) == 2
assert "intrusion-set--a653431d-6a5e-4600-8ad3-609b5af57064" in [is_.id for is_ in intrusion_sets]
assert "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a" in [is_.id for is_ in intrusion_sets]
is_1 = [is_ for is_ in intrusion_sets if is_.id == "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a"][0]
assert "DragonOK" in is_1.aliases
assert len(is_1.external_references) == 4
def test_filesystem_sink_add_python_stix_object(fs_sink, fs_source):
# add python stix object
camp1 = Campaign(name="Hannibal",
objective="Targeting Italian and Spanish Diplomat internet accounts",
aliases=["War Elephant"])
fs_sink.add(camp1)
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp1.id + ".json"))
camp1_r = fs_source.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == "Hannibal"
assert "War Elephant" in camp1_r.aliases
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
def test_filesystem_sink_add_stix_object_dict(fs_sink, fs_source):
# add stix object dict
camp2 = {
"name": "Aurelius",
"type": "campaign",
"objective": "German and French Intelligence Services",
"aliases": ["Purple Robes"],
"id": "campaign--111111b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
fs_sink.add(camp2)
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp2["id"] + ".json"))
camp2_r = fs_source.get(camp2["id"])
assert camp2_r.id == camp2["id"]
assert camp2_r.name == camp2["name"]
assert "Purple Robes" in camp2_r.aliases
os.remove(os.path.join(FS_PATH, "campaign", camp2_r.id + ".json"))
def test_filesystem_sink_add_stix_bundle_dict(fs_sink, fs_source):
# add stix bundle dict
bund = {
"type": "bundle",
"id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a",
"spec_version": "2.0",
"objects": [
{
"name": "Atilla",
"type": "campaign",
"objective": "Bulgarian, Albanian and Romanian Intelligence Services",
"aliases": ["Huns"],
"id": "campaign--133111b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
]
}
fs_sink.add(bund)
assert os.path.exists(os.path.join(FS_PATH, "campaign", bund["objects"][0]["id"] + ".json"))
camp3_r = fs_source.get(bund["objects"][0]["id"])
assert camp3_r.id == bund["objects"][0]["id"]
assert camp3_r.name == bund["objects"][0]["name"]
assert "Huns" in camp3_r.aliases
os.remove(os.path.join(FS_PATH, "campaign", camp3_r.id + ".json"))
def test_filesystem_sink_add_json_stix_object(fs_sink, fs_source):
# add json-encoded stix obj
camp4 = '{"type": "campaign", "id":"campaign--144111b6-1112-4fb0-111b-b111107ca70a",'\
' "created":"2017-05-31T21:31:53.197755Z", "name": "Ghengis Khan", "objective": "China and Russian infrastructure"}'
fs_sink.add(camp4)
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--144111b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp4_r = fs_source.get("campaign--144111b6-1112-4fb0-111b-b111107ca70a")
assert camp4_r.id == "campaign--144111b6-1112-4fb0-111b-b111107ca70a"
assert camp4_r.name == "Ghengis Khan"
os.remove(os.path.join(FS_PATH, "campaign", camp4_r.id + ".json"))
def test_filesystem_sink_json_stix_bundle(fs_sink, fs_source):
# add json-encoded stix bundle
bund2 = '{"type": "bundle", "id": "bundle--332211b6-1132-4fb0-111b-b111107ca70a",' \
' "spec_version": "2.0", "objects": [{"type": "campaign", "id": "campaign--155155b6-1112-4fb0-111b-b111107ca70a",' \
' "created":"2017-05-31T21:31:53.197755Z", "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}'
fs_sink.add(bund2)
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--155155b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp5_r = fs_source.get("campaign--155155b6-1112-4fb0-111b-b111107ca70a")
assert camp5_r.id == "campaign--155155b6-1112-4fb0-111b-b111107ca70a"
assert camp5_r.name == "Spartacus"
os.remove(os.path.join(FS_PATH, "campaign", camp5_r.id + ".json"))
def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
# add list of objects
camp6 = Campaign(name="Comanche",
objective="US Midwest manufacturing firms, oil refineries, and businesses",
aliases=["Horse Warrior"])
camp7 = {
"name": "Napolean",
"type": "campaign",
"objective": "Central and Eastern Europe military commands and departments",
"aliases": ["The Frenchmen"],
"id": "campaign--122818b6-1112-4fb0-111b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
fs_sink.add([camp6, camp7])
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp6.id + ".json"))
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-111b-b111107ca70a" + ".json"))
camp6_r = fs_source.get(camp6.id)
assert camp6_r.id == camp6.id
assert "Horse Warrior" in camp6_r.aliases
camp7_r = fs_source.get(camp7["id"])
assert camp7_r.id == camp7["id"]
assert "The Frenchmen" in camp7_r.aliases
# remove all added objects
os.remove(os.path.join(FS_PATH, "campaign", camp6_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp7_r.id + ".json"))
def test_filesystem_store_get(fs_store):
# get()
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
assert coa.type == "course-of-action"
def test_filesystem_store_all_versions(fs_store):
# all versions() - (note at this time, all_versions() is still not applicable to FileSystem, as only one version is ever stored)
rel = fs_store.all_versions("relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1")[0]
assert rel.id == "relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1"
assert rel.type == "relationship"
def test_filesystem_store_query(fs_store):
# query()
tools = fs_store.query([Filter("labels", "in", "tool")])
assert len(tools) == 2
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
def test_filesystem_store_add(fs_store):
# add()
camp1 = Campaign(name="Great Heathen Army",
objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England",
aliases=["Ragnar"])
fs_store.add(camp1)
camp1_r = fs_store.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == camp1.name
# remove
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
def test_filesystem_add_object_with_custom_property_in_bundle(fs_store):
bundle = Bundle()
fs_store.add(bundle)