449 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			449 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
| import os
 | |
| import shutil
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from stix2 import Filter, MemorySource, MemoryStore, properties
 | |
| from stix2.datastore import make_id
 | |
| from stix2.utils import parse_into_datetime
 | |
| from stix2.v20 import (
 | |
|     Bundle, Campaign, CustomObject, Identity, Indicator, Malware, Relationship,
 | |
| )
 | |
| 
 | |
| from .constants import (
 | |
|     CAMPAIGN_ID, CAMPAIGN_KWARGS, IDENTITY_ID, IDENTITY_KWARGS, INDICATOR_ID,
 | |
|     INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS, RELATIONSHIP_IDS,
 | |
| )
 | |
| 
 | |
| IND1 = {
 | |
|     "created": "2017-01-27T13:49:53.935Z",
 | |
|     "id": "indicator--00000000-0000-4000-8000-000000000001",
 | |
|     "labels": [
 | |
|         "malicious-activity",
 | |
|     ],
 | |
|     "modified": "2017-01-27T13:49:53.935Z",
 | |
|     "name": "Malicious site hosting downloader",
 | |
|     "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
 | |
|     "type": "indicator",
 | |
|     "valid_from": "2017-01-27T13:49:53.935382Z",
 | |
| }
 | |
| IND2 = {
 | |
|     "created": "2017-01-27T13:49:53.935Z",
 | |
|     "id": "indicator--00000000-0000-4000-8000-000000000001",
 | |
|     "labels": [
 | |
|         "malicious-activity",
 | |
|     ],
 | |
|     "modified": "2017-01-27T13:49:53.935Z",
 | |
|     "name": "Malicious site hosting downloader",
 | |
|     "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
 | |
|     "type": "indicator",
 | |
|     "valid_from": "2017-01-27T13:49:53.935382Z",
 | |
| }
 | |
| IND3 = {
 | |
|     "created": "2017-01-27T13:49:53.935Z",
 | |
|     "id": "indicator--00000000-0000-4000-8000-000000000001",
 | |
|     "labels": [
 | |
|         "malicious-activity",
 | |
|     ],
 | |
|     "modified": "2017-01-27T13:49:53.936Z",
 | |
|     "name": "Malicious site hosting downloader",
 | |
|     "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
 | |
|     "type": "indicator",
 | |
|     "valid_from": "2017-01-27T13:49:53.935382Z",
 | |
| }
 | |
| IND4 = {
 | |
|     "created": "2017-01-27T13:49:53.935Z",
 | |
|     "id": "indicator--00000000-0000-4000-8000-000000000002",
 | |
|     "labels": [
 | |
|         "malicious-activity",
 | |
|     ],
 | |
|     "modified": "2017-01-27T13:49:53.935Z",
 | |
|     "name": "Malicious site hosting downloader",
 | |
|     "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
 | |
|     "type": "indicator",
 | |
|     "valid_from": "2017-01-27T13:49:53.935382Z",
 | |
| }
 | |
| IND5 = {
 | |
|     "created": "2017-01-27T13:49:53.935Z",
 | |
|     "id": "indicator--00000000-0000-4000-8000-000000000002",
 | |
|     "labels": [
 | |
|         "malicious-activity",
 | |
|     ],
 | |
|     "modified": "2017-01-27T13:49:53.935Z",
 | |
|     "name": "Malicious site hosting downloader",
 | |
|     "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
 | |
|     "type": "indicator",
 | |
|     "valid_from": "2017-01-27T13:49:53.935382Z",
 | |
| }
 | |
| IND6 = {
 | |
|     "created": "2017-01-27T13:49:53.935Z",
 | |
|     "id": "indicator--00000000-0000-4000-8000-000000000001",
 | |
|     "labels": [
 | |
|         "malicious-activity",
 | |
|     ],
 | |
|     "modified": "2017-01-31T13:49:53.935Z",
 | |
|     "name": "Malicious site hosting downloader",
 | |
|     "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
 | |
|     "type": "indicator",
 | |
|     "valid_from": "2017-01-27T13:49:53.935382Z",
 | |
| }
 | |
| IND7 = {
 | |
|     "created": "2017-01-27T13:49:53.935Z",
 | |
|     "id": "indicator--00000000-0000-4000-8000-000000000002",
 | |
|     "labels": [
 | |
|         "malicious-activity",
 | |
|     ],
 | |
|     "modified": "2017-01-27T13:49:53.935Z",
 | |
|     "name": "Malicious site hosting downloader",
 | |
|     "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
 | |
|     "type": "indicator",
 | |
|     "valid_from": "2017-01-27T13:49:53.935382Z",
 | |
| }
 | |
| IND8 = {
 | |
|     "created": "2017-01-27T13:49:53.935Z",
 | |
|     "id": "indicator--00000000-0000-4000-8000-000000000002",
 | |
|     "labels": [
 | |
|         "malicious-activity",
 | |
|     ],
 | |
|     "modified": "2017-01-27T13:49:53.935Z",
 | |
|     "name": "Malicious site hosting downloader",
 | |
|     "pattern": "[url:value = 'http://x4z9arb.cn/4712']",
 | |
|     "type": "indicator",
 | |
|     "valid_from": "2017-01-27T13:49:53.935382Z",
 | |
| }
 | |
| 
 | |
| STIX_OBJS2 = [IND6, IND7, IND8]
 | |
| STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def mem_store():
 | |
|     yield MemoryStore(STIX_OBJS1)
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def mem_source():
 | |
|     yield MemorySource(STIX_OBJS1)
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def rel_mem_store():
 | |
|     cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
 | |
|     idy = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
 | |
|     ind = Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
 | |
|     mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
 | |
|     rel1 = Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
 | |
|     rel2 = Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
 | |
|     rel3 = Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
 | |
|     stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3]
 | |
|     yield MemoryStore(stix_objs)
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def fs_mem_store(request, mem_store):
 | |
|     filename = mem_store.save_to_file('memory_test/mem_store.json')
 | |
| 
 | |
|     def fin():
 | |
|         # teardown, executed regardless of exception
 | |
|         shutil.rmtree(os.path.dirname(filename))
 | |
|     request.addfinalizer(fin)
 | |
| 
 | |
|     return filename
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def fs_mem_store_no_name(request, mem_store):
 | |
|     filename = mem_store.save_to_file('memory_test/')
 | |
| 
 | |
|     def fin():
 | |
|         # teardown, executed regardless of exception
 | |
|         shutil.rmtree(os.path.dirname(filename))
 | |
|     request.addfinalizer(fin)
 | |
| 
 | |
|     return filename
 | |
| 
 | |
| 
 | |
| def test_memory_source_get(mem_source):
 | |
|     resp = mem_source.get("indicator--00000000-0000-4000-8000-000000000001")
 | |
|     assert resp["id"] == "indicator--00000000-0000-4000-8000-000000000001"
 | |
| 
 | |
| 
 | |
| def test_memory_source_get_nonexistant_object(mem_source):
 | |
|     resp = mem_source.get("tool--8d0b222c-7a3b-44a0-b9c6-31b051efb32e")
 | |
|     assert resp is None
 | |
| 
 | |
| 
 | |
| def test_memory_store_all_versions(mem_store):
 | |
|     # Add bundle of items to sink
 | |
|     mem_store.add(
 | |
|         dict(
 | |
|             id="bundle--%s" % make_id(),
 | |
|             objects=STIX_OBJS2,
 | |
|             spec_version="2.0",
 | |
|             type="bundle",
 | |
|         ),
 | |
|     )
 | |
| 
 | |
|     resp = mem_store.all_versions("indicator--00000000-0000-4000-8000-000000000001")
 | |
|     assert len(resp) == 3
 | |
| 
 | |
| 
 | |
| def test_memory_store_query(mem_store):
 | |
|     query = [Filter('type', '=', 'malware')]
 | |
|     resp = mem_store.query(query)
 | |
|     assert len(resp) == 0
 | |
| 
 | |
| 
 | |
| def test_memory_store_query_single_filter(mem_store):
 | |
|     query = Filter('id', '=', 'indicator--00000000-0000-4000-8000-000000000001')
 | |
|     resp = mem_store.query(query)
 | |
|     assert len(resp) == 2
 | |
| 
 | |
| 
 | |
| def test_memory_store_query_empty_query(mem_store):
 | |
|     resp = mem_store.query()
 | |
|     # sort since returned in random order
 | |
|     resp = sorted(resp, key=lambda k: (k['id'], k['modified']))
 | |
|     assert len(resp) == 3
 | |
|     assert resp[0]['id'] == 'indicator--00000000-0000-4000-8000-000000000001'
 | |
|     assert resp[0]['modified'] == parse_into_datetime('2017-01-27T13:49:53.935Z')
 | |
|     assert resp[1]['id'] == 'indicator--00000000-0000-4000-8000-000000000001'
 | |
|     assert resp[1]['modified'] == parse_into_datetime('2017-01-27T13:49:53.936Z')
 | |
|     assert resp[2]['id'] == 'indicator--00000000-0000-4000-8000-000000000002'
 | |
|     assert resp[2]['modified'] == parse_into_datetime('2017-01-27T13:49:53.935Z')
 | |
| 
 | |
| 
 | |
| def test_memory_store_query_multiple_filters(mem_store):
 | |
|     mem_store.source.filters.add(Filter('type', '=', 'indicator'))
 | |
|     query = Filter('id', '=', 'indicator--00000000-0000-4000-8000-000000000001')
 | |
|     resp = mem_store.query(query)
 | |
|     assert len(resp) == 2
 | |
| 
 | |
| 
 | |
| def test_memory_store_save_load_file(fs_mem_store):
 | |
|     filename = fs_mem_store  # the fixture fs_mem_store yields filename where the memory store was written to
 | |
| 
 | |
|     # STIX2 contents of mem_store have already been written to file
 | |
|     # (this is done in fixture 'fs_mem_store'), so can already read-in here
 | |
|     contents = open(os.path.abspath(filename)).read()
 | |
| 
 | |
|     assert '"id": "indicator--00000000-0000-4000-8000-000000000001",' in contents
 | |
|     assert '"id": "indicator--00000000-0000-4000-8000-000000000001",' in contents
 | |
| 
 | |
|     mem_store2 = MemoryStore()
 | |
|     mem_store2.load_from_file(filename)
 | |
|     assert mem_store2.get("indicator--00000000-0000-4000-8000-000000000001")
 | |
|     assert mem_store2.get("indicator--00000000-0000-4000-8000-000000000001")
 | |
| 
 | |
| 
 | |
| def test_memory_store_save_load_file_no_name_provided(fs_mem_store_no_name):
 | |
|     filename = fs_mem_store_no_name  # the fixture fs_mem_store yields filename where the memory store was written to
 | |
| 
 | |
|     # STIX2 contents of mem_store have already been written to file
 | |
|     # (this is done in fixture 'fs_mem_store'), so can already read-in here
 | |
|     contents = open(os.path.abspath(filename)).read()
 | |
| 
 | |
|     assert '"id": "indicator--00000000-0000-4000-8000-000000000001",' in contents
 | |
|     assert '"id": "indicator--00000000-0000-4000-8000-000000000001",' in contents
 | |
| 
 | |
|     mem_store2 = MemoryStore()
 | |
|     mem_store2.load_from_file(filename)
 | |
|     assert mem_store2.get("indicator--00000000-0000-4000-8000-000000000001")
 | |
|     assert mem_store2.get("indicator--00000000-0000-4000-8000-000000000001")
 | |
| 
 | |
| 
 | |
| def test_memory_store_add_invalid_object(mem_store):
 | |
|     ind = ('indicator', IND1)  # tuple isn't valid
 | |
|     with pytest.raises(TypeError):
 | |
|         mem_store.add(ind)
 | |
| 
 | |
| 
 | |
| def test_memory_store_object_with_custom_property(mem_store):
 | |
|     camp = Campaign(
 | |
|         name="Scipio Africanus",
 | |
|         objective="Defeat the Carthaginians",
 | |
|         x_empire="Roman",
 | |
|         allow_custom=True,
 | |
|     )
 | |
| 
 | |
|     mem_store.add(camp)
 | |
| 
 | |
|     camp_r = mem_store.get(camp.id)
 | |
|     assert camp_r.id == camp.id
 | |
|     assert camp_r.x_empire == camp.x_empire
 | |
| 
 | |
| 
 | |
| def test_memory_store_object_creator_of_present(mem_store):
 | |
|     camp = Campaign(
 | |
|         name="Scipio Africanus",
 | |
|         objective="Defeat the Carthaginians",
 | |
|         created_by_ref=IDENTITY_ID,
 | |
|         x_empire="Roman",
 | |
|         allow_custom=True,
 | |
|     )
 | |
| 
 | |
|     iden = Identity(
 | |
|         id=IDENTITY_ID,
 | |
|         name="Foo Corp.",
 | |
|         identity_class="organization",
 | |
|     )
 | |
| 
 | |
|     mem_store.add(camp)
 | |
|     mem_store.add(iden)
 | |
| 
 | |
|     camp_r = mem_store.get(camp.id)
 | |
|     assert camp_r.id == camp.id
 | |
|     assert camp_r.x_empire == camp.x_empire
 | |
|     assert mem_store.creator_of(camp_r) == iden
 | |
| 
 | |
| 
 | |
| def test_memory_store_object_creator_of_missing(mem_store):
 | |
|     camp = Campaign(
 | |
|         name="Scipio Africanus",
 | |
|         objective="Defeat the Carthaginians",
 | |
|         x_empire="Roman",
 | |
|         allow_custom=True,
 | |
|     )
 | |
| 
 | |
|     mem_store.add(camp)
 | |
| 
 | |
|     camp_r = mem_store.get(camp.id)
 | |
|     assert camp_r.id == camp.id
 | |
|     assert camp_r.x_empire == camp.x_empire
 | |
|     assert mem_store.creator_of(camp) is None
 | |
| 
 | |
| 
 | |
| def test_memory_store_object_with_custom_property_in_bundle(mem_store):
 | |
|     camp = Campaign(
 | |
|         name="Scipio Africanus",
 | |
|         objective="Defeat the Carthaginians",
 | |
|         x_empire="Roman",
 | |
|         allow_custom=True,
 | |
|     )
 | |
| 
 | |
|     bundle = Bundle(camp, allow_custom=True)
 | |
|     mem_store.add(bundle)
 | |
| 
 | |
|     camp_r = mem_store.get(camp.id)
 | |
|     assert camp_r.id == camp.id
 | |
|     assert camp_r.x_empire == camp.x_empire
 | |
| 
 | |
| 
 | |
| def test_memory_store_custom_object(mem_store):
 | |
|     @CustomObject(
 | |
|         'x-new-obj-3', [
 | |
|             ('property1', properties.StringProperty(required=True)),
 | |
|         ],
 | |
|     )
 | |
|     class NewObj():
 | |
|         pass
 | |
| 
 | |
|     newobj = NewObj(property1='something')
 | |
|     mem_store.add(newobj)
 | |
| 
 | |
|     newobj_r = mem_store.get(newobj.id)
 | |
|     assert newobj_r.id == newobj.id
 | |
|     assert newobj_r.property1 == 'something'
 | |
| 
 | |
| 
 | |
| def test_relationships(rel_mem_store):
 | |
|     mal = rel_mem_store.get(MALWARE_ID)
 | |
|     resp = rel_mem_store.relationships(mal)
 | |
| 
 | |
|     assert len(resp) == 3
 | |
|     assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
 | |
|     assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
 | |
|     assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
 | |
| 
 | |
| 
 | |
| def test_relationships_by_type(rel_mem_store):
 | |
|     mal = rel_mem_store.get(MALWARE_ID)
 | |
|     resp = rel_mem_store.relationships(mal, relationship_type='indicates')
 | |
| 
 | |
|     assert len(resp) == 1
 | |
|     assert resp[0]['id'] == RELATIONSHIP_IDS[0]
 | |
| 
 | |
| 
 | |
| def test_relationships_by_source(rel_mem_store):
 | |
|     resp = rel_mem_store.relationships(MALWARE_ID, source_only=True)
 | |
| 
 | |
|     assert len(resp) == 1
 | |
|     assert resp[0]['id'] == RELATIONSHIP_IDS[1]
 | |
| 
 | |
| 
 | |
| def test_relationships_by_target(rel_mem_store):
 | |
|     resp = rel_mem_store.relationships(MALWARE_ID, target_only=True)
 | |
| 
 | |
|     assert len(resp) == 2
 | |
|     assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
 | |
|     assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
 | |
| 
 | |
| 
 | |
| def test_relationships_by_target_and_type(rel_mem_store):
 | |
|     resp = rel_mem_store.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
 | |
| 
 | |
|     assert len(resp) == 1
 | |
|     assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
 | |
| 
 | |
| 
 | |
| def test_relationships_by_target_and_source(rel_mem_store):
 | |
|     with pytest.raises(ValueError) as excinfo:
 | |
|         rel_mem_store.relationships(MALWARE_ID, target_only=True, source_only=True)
 | |
| 
 | |
|     assert 'not both' in str(excinfo.value)
 | |
| 
 | |
| 
 | |
| def test_related_to(rel_mem_store):
 | |
|     mal = rel_mem_store.get(MALWARE_ID)
 | |
|     resp = rel_mem_store.related_to(mal)
 | |
| 
 | |
|     assert len(resp) == 3
 | |
|     assert any(x['id'] == CAMPAIGN_ID for x in resp)
 | |
|     assert any(x['id'] == INDICATOR_ID for x in resp)
 | |
|     assert any(x['id'] == IDENTITY_ID for x in resp)
 | |
| 
 | |
| 
 | |
| def test_related_to_by_source(rel_mem_store):
 | |
|     resp = rel_mem_store.related_to(MALWARE_ID, source_only=True)
 | |
| 
 | |
|     assert len(resp) == 1
 | |
|     assert any(x['id'] == IDENTITY_ID for x in resp)
 | |
| 
 | |
| 
 | |
| def test_related_to_by_target(rel_mem_store):
 | |
|     resp = rel_mem_store.related_to(MALWARE_ID, target_only=True)
 | |
| 
 | |
|     assert len(resp) == 2
 | |
|     assert any(x['id'] == CAMPAIGN_ID for x in resp)
 | |
|     assert any(x['id'] == INDICATOR_ID for x in resp)
 | |
| 
 | |
| 
 | |
| def test_object_family_internal_components(mem_source):
 | |
|     # Testing internal components.
 | |
|     str_representation = str(mem_source._data['indicator--00000000-0000-4000-8000-000000000001'])
 | |
|     repr_representation = repr(mem_source._data['indicator--00000000-0000-4000-8000-000000000001'])
 | |
| 
 | |
|     assert "latest=2017-01-27 13:49:53.936000+00:00>>" in str_representation
 | |
|     assert "latest=2017-01-27 13:49:53.936000+00:00>>" in repr_representation
 | |
| 
 | |
| 
 | |
| def test_unversioned_objects(mem_store):
 | |
|     marking = {
 | |
|         "type": "marking-definition",
 | |
|         "id": "marking-definition--48e83cde-e902-4404-85b3-6e81f75ccb62",
 | |
|         "created": "1988-01-02T16:44:04.000Z",
 | |
|         "definition_type": "statement",
 | |
|         "definition": {
 | |
|             "statement": "Copyright (C) ACME Corp.",
 | |
|         },
 | |
|     }
 | |
| 
 | |
|     mem_store.add(marking)
 | |
| 
 | |
|     obj = mem_store.get(marking["id"])
 | |
|     assert obj["id"] == marking["id"]
 | |
| 
 | |
|     objs = mem_store.all_versions(marking["id"])
 | |
|     assert len(objs) == 1
 | |
|     assert objs[0]["id"] == marking["id"]
 |