cti-python-stix2/stix2/test/test_datastore_filesystem.py

530 lines
18 KiB
Python

import json
import os
import shutil
import pytest
from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
FileSystemSource, FileSystemStore, Filter, Identity,
Indicator, Malware, Relationship, properties)
from stix2.test.constants import (CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID,
IDENTITY_KWARGS, INDICATOR_ID,
INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS,
RELATIONSHIP_IDS)
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
@pytest.fixture
def fs_store():
# create
yield FileSystemStore(FS_PATH)
# remove campaign dir
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
@pytest.fixture
def fs_source():
# create
fs = FileSystemSource(FS_PATH)
assert fs.stix_dir == FS_PATH
yield fs
# remove campaign dir
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
@pytest.fixture
def fs_sink():
# create
fs = FileSystemSink(FS_PATH)
assert fs.stix_dir == FS_PATH
yield fs
# remove campaign dir
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
@pytest.fixture
def bad_json_files():
# create erroneous JSON files for tests to make sure handled gracefully
with open(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-json.txt"), "w+") as f:
f.write("Im not a JSON file")
with open(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-bad-json.json"), "w+") as f:
f.write("Im not a JSON formatted file")
yield True # dummy yield so can have teardown
os.remove(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-json.txt"))
os.remove(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-bad-json.json"))
@pytest.fixture
def bad_stix_files():
# create erroneous STIX JSON files for tests to make sure handled correctly
# bad STIX object
stix_obj = {
"id": "intrusion-set--test-bad-stix",
"spec_version": "2.0"
# no "type" field
}
with open(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-stix.json"), "w+") as f:
f.write(json.dumps(stix_obj))
yield True # dummy yield so can have teardown
os.remove(os.path.join(FS_PATH, "intrusion-set", "intrusion-set--test-non-stix.json"))
@pytest.fixture(scope='module')
def rel_fs_store():
cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
rel1 = Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
rel2 = Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
rel3 = Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
fs = FileSystemStore(FS_PATH)
for o in stix_objs:
fs.add(o)
yield fs
for o in stix_objs:
os.remove(os.path.join(FS_PATH, o.type, o.id + '.json'))
def test_filesystem_source_nonexistent_folder():
with pytest.raises(ValueError) as excinfo:
FileSystemSource('nonexistent-folder')
assert "for STIX data does not exist" in str(excinfo)
def test_filesystem_sink_nonexistent_folder():
with pytest.raises(ValueError) as excinfo:
FileSystemSink('nonexistent-folder')
assert "for STIX data does not exist" in str(excinfo)
def test_filesystem_source_bad_json_file(fs_source, bad_json_files):
# this tests the handling of two bad json files
# - one file should just be skipped (silently) as its a ".txt" extension
# - one file should be parsed and raise Exception bc its not JSON
try:
fs_source.get("intrusion-set--test-bad-json")
except TypeError as e:
assert "intrusion-set--test-bad-json" in str(e)
assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e)
def test_filesystem_source_bad_stix_file(fs_source, bad_stix_files):
# this tests handling of bad STIX json object
try:
fs_source.get("intrusion-set--test-non-stix")
except TypeError as e:
assert "intrusion-set--test-non-stix" in str(e)
assert "could either not be parsed to JSON or was not valid STIX JSON" in str(e)
def test_filesytem_source_get_object(fs_source):
# get object
mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")
assert mal.id == "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"
assert mal.name == "Rover"
def test_filesytem_source_get_nonexistent_object(fs_source):
ind = fs_source.get("indicator--6b616fc1-1505-48e3-8b2c-0d19337bff38")
assert ind is None
def test_filesytem_source_all_versions(fs_source):
# all versions - (currently not a true all versions call as FileSystem cant have multiple versions)
id_ = fs_source.get("identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5")
assert id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
assert id_.name == "The MITRE Corporation"
assert id_.type == "identity"
def test_filesytem_source_query_single(fs_source):
# query2
is_2 = fs_source.query([Filter("external_references.external_id", '=', "T1027")])
assert len(is_2) == 1
is_2 = is_2[0]
assert is_2.id == "attack-pattern--b3d682b6-98f2-4fb0-aa3b-b4df007ca70a"
assert is_2.type == "attack-pattern"
def test_filesytem_source_query_multiple(fs_source):
# query
intrusion_sets = fs_source.query([Filter("type", '=', "intrusion-set")])
assert len(intrusion_sets) == 2
assert "intrusion-set--a653431d-6a5e-4600-8ad3-609b5af57064" in [is_.id for is_ in intrusion_sets]
assert "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a" in [is_.id for is_ in intrusion_sets]
is_1 = [is_ for is_ in intrusion_sets if is_.id == "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a"][0]
assert "DragonOK" in is_1.aliases
assert len(is_1.external_references) == 4
def test_filesystem_sink_add_python_stix_object(fs_sink, fs_source):
# add python stix object
camp1 = Campaign(name="Hannibal",
objective="Targeting Italian and Spanish Diplomat internet accounts",
aliases=["War Elephant"])
fs_sink.add(camp1)
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp1.id + ".json"))
camp1_r = fs_source.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == "Hannibal"
assert "War Elephant" in camp1_r.aliases
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
def test_filesystem_sink_add_stix_object_dict(fs_sink, fs_source):
# add stix object dict
camp2 = {
"name": "Aurelius",
"type": "campaign",
"objective": "German and French Intelligence Services",
"aliases": ["Purple Robes"],
"id": "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"created": "2017-05-31T21:31:53.197755Z"
}
fs_sink.add(camp2)
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp2["id"] + ".json"))
camp2_r = fs_source.get(camp2["id"])
assert camp2_r.id == camp2["id"]
assert camp2_r.name == camp2["name"]
assert "Purple Robes" in camp2_r.aliases
os.remove(os.path.join(FS_PATH, "campaign", camp2_r.id + ".json"))
def test_filesystem_sink_add_stix_bundle_dict(fs_sink, fs_source):
# add stix bundle dict
bund = {
"type": "bundle",
"id": "bundle--040ae5ec-2e91-4e94-b075-bc8b368e8ca3",
"spec_version": "2.0",
"objects": [
{
"name": "Atilla",
"type": "campaign",
"objective": "Bulgarian, Albanian and Romanian Intelligence Services",
"aliases": ["Huns"],
"id": "campaign--b8f86161-ccae-49de-973a-4ca320c62478",
"created": "2017-05-31T21:31:53.197755Z"
}
]
}
fs_sink.add(bund)
assert os.path.exists(os.path.join(FS_PATH, "campaign", bund["objects"][0]["id"] + ".json"))
camp3_r = fs_source.get(bund["objects"][0]["id"])
assert camp3_r.id == bund["objects"][0]["id"]
assert camp3_r.name == bund["objects"][0]["name"]
assert "Huns" in camp3_r.aliases
os.remove(os.path.join(FS_PATH, "campaign", camp3_r.id + ".json"))
def test_filesystem_sink_add_json_stix_object(fs_sink, fs_source):
# add json-encoded stix obj
camp4 = '{"type": "campaign", "id":"campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d",'\
' "created":"2017-05-31T21:31:53.197755Z", "name": "Ghengis Khan", "objective": "China and Russian infrastructure"}'
fs_sink.add(camp4)
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d" + ".json"))
camp4_r = fs_source.get("campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d")
assert camp4_r.id == "campaign--6a6ca372-ba07-42cc-81ef-9840fc1f963d"
assert camp4_r.name == "Ghengis Khan"
os.remove(os.path.join(FS_PATH, "campaign", camp4_r.id + ".json"))
def test_filesystem_sink_json_stix_bundle(fs_sink, fs_source):
# add json-encoded stix bundle
bund2 = '{"type": "bundle", "id": "bundle--3d267103-8475-4d8f-b321-35ec6eccfa37",' \
' "spec_version": "2.0", "objects": [{"type": "campaign", "id": "campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b",' \
' "created":"2017-05-31T21:31:53.197755Z", "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}'
fs_sink.add(bund2)
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b" + ".json"))
camp5_r = fs_source.get("campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b")
assert camp5_r.id == "campaign--2c03b8bf-82ee-433e-9918-ca2cb6e9534b"
assert camp5_r.name == "Spartacus"
os.remove(os.path.join(FS_PATH, "campaign", camp5_r.id + ".json"))
def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
# add list of objects
camp6 = Campaign(name="Comanche",
objective="US Midwest manufacturing firms, oil refineries, and businesses",
aliases=["Horse Warrior"])
camp7 = {
"name": "Napolean",
"type": "campaign",
"objective": "Central and Eastern Europe military commands and departments",
"aliases": ["The Frenchmen"],
"id": "campaign--122818b6-1112-4fb0-b11b-b111107ca70a",
"created": "2017-05-31T21:31:53.197755Z"
}
fs_sink.add([camp6, camp7])
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp6.id + ".json"))
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-b11b-b111107ca70a" + ".json"))
camp6_r = fs_source.get(camp6.id)
assert camp6_r.id == camp6.id
assert "Horse Warrior" in camp6_r.aliases
camp7_r = fs_source.get(camp7["id"])
assert camp7_r.id == camp7["id"]
assert "The Frenchmen" in camp7_r.aliases
# remove all added objects
os.remove(os.path.join(FS_PATH, "campaign", camp6_r.id + ".json"))
os.remove(os.path.join(FS_PATH, "campaign", camp7_r.id + ".json"))
def test_filesystem_store_get_stored_as_bundle(fs_store):
coa = fs_store.get("course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f")
assert coa.id == "course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f"
assert coa.type == "course-of-action"
def test_filesystem_store_get_stored_as_object(fs_store):
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
assert coa.type == "course-of-action"
def test_filesystem_store_all_versions(fs_store):
# all versions() - (note at this time, all_versions() is still not applicable to FileSystem, as only one version is ever stored)
rel = fs_store.all_versions("relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1")[0]
assert rel.id == "relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1"
assert rel.type == "relationship"
def test_filesystem_store_query(fs_store):
# query()
tools = fs_store.query([Filter("labels", "in", "tool")])
assert len(tools) == 2
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
def test_filesystem_store_query_single_filter(fs_store):
query = Filter("labels", "in", "tool")
tools = fs_store.query(query)
assert len(tools) == 2
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
def test_filesystem_store_empty_query(fs_store):
results = fs_store.query() # returns all
assert len(results) == 26
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj.id for obj in results]
assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj.id for obj in results]
def test_filesystem_store_query_multiple_filters(fs_store):
fs_store.source.filters.add(Filter("labels", "in", "tool"))
tools = fs_store.query(Filter("id", "=", "tool--242f3da3-4425-4d11-8f5c-b842886da966"))
assert len(tools) == 1
assert tools[0].id == "tool--242f3da3-4425-4d11-8f5c-b842886da966"
def test_filesystem_store_query_dont_include_type_folder(fs_store):
results = fs_store.query(Filter("type", "!=", "tool"))
assert len(results) == 24
def test_filesystem_store_add(fs_store):
# add()
camp1 = Campaign(name="Great Heathen Army",
objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England",
aliases=["Ragnar"])
fs_store.add(camp1)
camp1_r = fs_store.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == camp1.name
# remove
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
def test_filesystem_store_add_as_bundle():
fs_store = FileSystemStore(FS_PATH, bundlify=True)
camp1 = Campaign(name="Great Heathen Army",
objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England",
aliases=["Ragnar"])
fs_store.add(camp1)
with open(os.path.join(FS_PATH, "campaign", camp1.id + ".json")) as bundle_file:
assert '"type": "bundle"' in bundle_file.read()
camp1_r = fs_store.get(camp1.id)
assert camp1_r.id == camp1.id
assert camp1_r.name == camp1.name
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
def test_filesystem_add_bundle_object(fs_store):
bundle = Bundle()
fs_store.add(bundle)
def test_filesystem_store_add_invalid_object(fs_store):
ind = ('campaign', 'campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f') # tuple isn't valid
with pytest.raises(TypeError) as excinfo:
fs_store.add(ind)
assert 'stix_data must be' in str(excinfo.value)
assert 'a STIX object' in str(excinfo.value)
assert 'JSON formatted STIX' in str(excinfo.value)
assert 'JSON formatted STIX bundle' in str(excinfo.value)
def test_filesystem_object_with_custom_property(fs_store):
camp = Campaign(name="Scipio Africanus",
objective="Defeat the Carthaginians",
x_empire="Roman",
allow_custom=True)
fs_store.add(camp, True)
camp_r = fs_store.get(camp.id)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
def test_filesystem_object_with_custom_property_in_bundle(fs_store):
camp = Campaign(name="Scipio Africanus",
objective="Defeat the Carthaginians",
x_empire="Roman",
allow_custom=True)
bundle = Bundle(camp, allow_custom=True)
fs_store.add(bundle)
camp_r = fs_store.get(camp.id)
assert camp_r.id == camp.id
assert camp_r.x_empire == camp.x_empire
def test_filesystem_custom_object(fs_store):
@CustomObject('x-new-obj', [
('property1', properties.StringProperty(required=True)),
])
class NewObj():
pass
newobj = NewObj(property1='something')
fs_store.add(newobj)
newobj_r = fs_store.get(newobj.id)
assert newobj_r.id == newobj.id
assert newobj_r.property1 == 'something'
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True)
def test_relationships(rel_fs_store):
mal = rel_fs_store.get(MALWARE_ID)
resp = rel_fs_store.relationships(mal)
assert len(resp) == 3
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_type(rel_fs_store):
mal = rel_fs_store.get(MALWARE_ID)
resp = rel_fs_store.relationships(mal, relationship_type='indicates')
assert len(resp) == 1
assert resp[0]['id'] == RELATIONSHIP_IDS[0]
def test_relationships_by_source(rel_fs_store):
resp = rel_fs_store.relationships(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert resp[0]['id'] == RELATIONSHIP_IDS[1]
def test_relationships_by_target(rel_fs_store):
resp = rel_fs_store.relationships(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_target_and_type(rel_fs_store):
resp = rel_fs_store.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
assert len(resp) == 1
assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
def test_relationships_by_target_and_source(rel_fs_store):
with pytest.raises(ValueError) as excinfo:
rel_fs_store.relationships(MALWARE_ID, target_only=True, source_only=True)
assert 'not both' in str(excinfo.value)
def test_related_to(rel_fs_store):
mal = rel_fs_store.get(MALWARE_ID)
resp = rel_fs_store.related_to(mal)
assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_source(rel_fs_store):
resp = rel_fs_store.related_to(MALWARE_ID, source_only=True)
assert len(resp) == 1
assert any(x['id'] == IDENTITY_ID for x in resp)
def test_related_to_by_target(rel_fs_store):
resp = rel_fs_store.related_to(MALWARE_ID, target_only=True)
assert len(resp) == 2
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)