Merge pull request from chisholm/malware2.1

Malware2.1
stix2.1
Greg Back 2018-06-15 08:22:35 -05:00 committed by GitHub
commit 78c4d48bd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 297 additions and 131 deletions

View File

@ -24,11 +24,18 @@
v21.observables
v21.sdo
v21.sro
The .v21 import can't be relocated, or we get circular import problems.
The 'isort:skip' line comment didn't work to skip only that one problematic
import. The only thing that did was telling it to skip the whole file.
isort:skip_file
"""
# flake8: noqa
from .core import Bundle, _collect_stix2_obj_maps, _register_type, parse
from .core import _collect_stix2_obj_maps, _register_type, parse
from .v21 import * # This import will always be the latest STIX 2.X version
from .datastore import CompositeDataSource
from .datastore.filesystem import (FileSystemSink, FileSystemSource,
FileSystemStore)
@ -59,7 +66,6 @@ from .patterns import (AndBooleanExpression, AndObservationExpression,
StartStopQualifier, StringConstant, TimestampConstant,
WithinQualifier)
from .utils import new_version, revoke
from .v21 import * # This import will always be the latest STIX 2.X version
from .version import __version__
_collect_stix2_obj_maps()

View File

@ -1,72 +1,10 @@
"""STIX 2.X Objects that are neither SDOs nor SROs."""
from collections import OrderedDict
import importlib
import pkgutil
import stix2
from . import exceptions
from .base import _STIXBase
from .properties import IDProperty, ListProperty, Property, TypeProperty
from .utils import _get_dict, get_class_hierarchy_names
class STIXObjectProperty(Property):
def __init__(self, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
super(STIXObjectProperty, self).__init__(*args, **kwargs)
def clean(self, value):
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
# a bundle with no further checks.
if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition')
for x in get_class_hierarchy_names(value)):
return value
try:
dictified = _get_dict(value)
except ValueError:
raise ValueError("This property may only contain a dictionary or object")
if dictified == {}:
raise ValueError("This property may only contain a non-empty dictionary or object")
if 'type' in dictified and dictified['type'] == 'bundle':
raise ValueError('This property may not contain a Bundle object')
if self.allow_custom:
parsed_obj = parse(dictified, allow_custom=True)
else:
parsed_obj = parse(dictified)
return parsed_obj
class Bundle(_STIXBase):
"""For more detailed information on this object's properties, see
`the STIX 2.0 specification <http://docs.oasis-open.org/cti/stix/v2.0/cs01/part1-stix-core/stix-v2.0-cs01-part1-stix-core.html#_Toc496709293>`__.
"""
_type = 'bundle'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('spec_version', Property(fixed="2.0")),
('objects', ListProperty(STIXObjectProperty)),
])
def __init__(self, *args, **kwargs):
# Add any positional arguments to the 'objects' kwarg.
if args:
if isinstance(args[0], list):
kwargs['objects'] = args[0] + list(args[1:]) + kwargs.get('objects', [])
else:
kwargs['objects'] = list(args) + kwargs.get('objects', [])
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].contained.allow_custom = kwargs.get('allow_custom', False)
super(Bundle, self).__init__(**kwargs)
from .utils import _get_dict
STIX2_OBJ_MAPS = {}
@ -79,8 +17,10 @@ def parse(data, allow_custom=False, version=None):
allow_custom (bool): Whether to allow custom properties as well unknown
custom objects. Note that unknown custom objects cannot be parsed
into STIX objects, and will be returned as is. Default: False.
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
None, use latest version.
version (str): Only used for bundles. If the spec_version property is
missing, it is ambiguous what spec should be used to parse the
bundle. In this case, this version parameter gives the spec
version to use.
Returns:
An instantiated Python STIX object.
@ -112,6 +52,10 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
allow_custom (bool): Whether to allow custom properties as well unknown
custom objects. Note that unknown custom objects cannot be parsed
into STIX objects, and will be returned as is. Default: False.
version: Only used for bundles. If the spec_version property is
missing, it is ambiguous what spec should be used to parse the
bundle. In this case, this version parameter gives the spec
version to use.
Returns:
An instantiated Python STIX object
@ -124,17 +68,24 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
STIX objects that I dont know about ahead of time)
"""
if not version:
# Use latest version
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
else:
v = 'v' + version.replace('.', '')
OBJ_MAP = STIX2_OBJ_MAPS[v]
if 'type' not in stix_dict:
raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(stix_dict))
if "spec_version" in stix_dict:
# For STIX 2.0, applies to bundles only.
# For STIX 2.1+, applies to SDOs, SROs, and markings only.
v = 'v' + stix_dict["spec_version"].replace('.', '')
elif stix_dict["type"] == "bundle":
# bundles without spec_version are ambiguous.
if version:
v = 'v' + version.replace('.', '')
else:
v = 'v' + stix2.DEFAULT_VERSION.replace('.', '')
else:
v = 'v20'
OBJ_MAP = STIX2_OBJ_MAPS[v]
try:
obj_class = OBJ_MAP[stix_dict['type']]
except KeyError:

View File

@ -6,7 +6,8 @@ Python STIX 2.0 FileSystem Source/Sink
import json
import os
from stix2.core import Bundle, parse
from stix2 import Bundle
from stix2.core import parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
from stix2.utils import deduplicate, get_class_hierarchy_names

View File

@ -15,8 +15,9 @@ Note:
import json
import os
from stix2 import Bundle
from stix2.base import _STIXBase
from stix2.core import Bundle, parse
from stix2.core import parse
from stix2.datastore import DataSink, DataSource, DataStoreMixin
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters
@ -286,7 +287,7 @@ class MemorySource(DataSource):
if stix_data["type"] == "bundle":
for stix_obj in stix_data["objects"]:
_add(self, stix_data=parse(stix_obj, allow_custom=self.allow_custom, version=stix_data["spec_version"]))
_add(self, stix_data=parse(stix_obj, allow_custom=self.allow_custom))
else:
_add(self, stix_data=parse(stix_data, allow_custom=self.allow_custom, version=version))
load_from_file.__doc__ = MemoryStore.load_from_file.__doc__

View File

@ -3,8 +3,9 @@ Python STIX 2.x TAXIICollectionStore
"""
from requests.exceptions import HTTPError
from stix2 import Bundle
from stix2.base import _STIXBase
from stix2.core import Bundle, parse
from stix2.core import parse
from stix2.datastore import (DataSink, DataSource, DataSourceError,
DataStoreMixin)
from stix2.datastore.filters import Filter, FilterSet, apply_common_filters

View File

@ -80,6 +80,7 @@ INTRUSION_SET_KWARGS = dict(
MALWARE_KWARGS = dict(
labels=['ransomware'],
name="Cryptolocker",
is_family=False
)
MALWARE_MORE_KWARGS = dict(
@ -89,7 +90,8 @@ MALWARE_MORE_KWARGS = dict(
modified="2016-04-06T20:03:00.000Z",
labels=['ransomware'],
name="Cryptolocker",
description="A ransomware related to ..."
description="A ransomware related to ...",
is_family=False
)
OBSERVED_DATA_KWARGS = dict(

View File

@ -26,9 +26,10 @@
"object_marking_refs": [
"marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168"
],
"type": "malware"
"type": "malware",
"is_family": false
}
],
"spec_version": "2.0",
"spec_version": "2.1",
"type": "bundle"
}

View File

@ -26,9 +26,10 @@
"object_marking_refs": [
"marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168"
],
"type": "malware"
"type": "malware",
"is_family": false
}
],
"spec_version": "2.0",
"spec_version": "2.1",
"type": "bundle"
}

View File

@ -26,9 +26,10 @@
"object_marking_refs": [
"marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168"
],
"type": "malware"
"type": "malware",
"is_family": false
}
],
"spec_version": "2.0",
"spec_version": "2.1",
"type": "bundle"
}

View File

@ -26,9 +26,10 @@
"object_marking_refs": [
"marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168"
],
"type": "malware"
"type": "malware",
"is_family": false
}
],
"spec_version": "2.0",
"spec_version": "2.1",
"type": "bundle"
}

View File

@ -3,11 +3,12 @@ import json
import pytest
import stix2
import stix2.v20.sdo
import stix2.v21.bundle
EXPECTED_BUNDLE = """{
"type": "bundle",
"id": "bundle--00000000-0000-0000-0000-000000000007",
"spec_version": "2.0",
"objects": [
{
"type": "indicator",
@ -22,16 +23,19 @@ EXPECTED_BUNDLE = """{
},
{
"type": "malware",
"spec_version": "2.1",
"id": "malware--00000000-0000-0000-0000-000000000003",
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"name": "Cryptolocker",
"labels": [
"ransomware"
]
],
"is_family": false
},
{
"type": "relationship",
"spec_version": "2.1",
"id": "relationship--00000000-0000-0000-0000-000000000005",
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
@ -45,7 +49,6 @@ EXPECTED_BUNDLE = """{
EXPECTED_BUNDLE_DICT = {
"type": "bundle",
"id": "bundle--00000000-0000-0000-0000-000000000007",
"spec_version": "2.0",
"objects": [
{
"type": "indicator",
@ -60,16 +63,19 @@ EXPECTED_BUNDLE_DICT = {
},
{
"type": "malware",
"spec_version": "2.1",
"id": "malware--00000000-0000-0000-0000-000000000003",
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"name": "Cryptolocker",
"labels": [
"ransomware"
]
],
"is_family": False
},
{
"type": "relationship",
"spec_version": "2.1",
"id": "relationship--00000000-0000-0000-0000-000000000005",
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
@ -86,7 +92,6 @@ def test_empty_bundle():
assert bundle.type == "bundle"
assert bundle.id.startswith("bundle--")
assert bundle.spec_version == "2.0"
with pytest.raises(AttributeError):
assert bundle.objects
@ -111,16 +116,6 @@ def test_bundle_id_must_start_with_bundle():
assert str(excinfo.value) == "Invalid value for Bundle 'id': must start with 'bundle--'."
def test_bundle_with_wrong_spec_version():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.Bundle(spec_version="1.2")
assert excinfo.value.cls == stix2.Bundle
assert excinfo.value.prop_name == "spec_version"
assert excinfo.value.reason == "must equal '2.0'."
assert str(excinfo.value) == "Invalid value for Bundle 'spec_version': must equal '2.0'."
def test_create_bundle1(indicator, malware, relationship):
bundle = stix2.Bundle(objects=[indicator, malware, relationship])
@ -178,14 +173,14 @@ def test_create_bundle_invalid(indicator, malware, relationship):
assert excinfo.value.reason == 'This property may not contain a Bundle object'
@pytest.mark.parametrize("version", ["2.0"])
@pytest.mark.parametrize("version", ["2.1"])
def test_parse_bundle(version):
bundle = stix2.parse(EXPECTED_BUNDLE, version=version)
assert bundle.type == "bundle"
assert bundle.id.startswith("bundle--")
assert bundle.spec_version == "2.0"
assert type(bundle.objects[0]) is stix2.Indicator
# TODO: update this to a STIX 2.1 indicator
assert type(bundle.objects[0]) is stix2.v20.sdo.Indicator
assert bundle.objects[0].type == 'indicator'
assert bundle.objects[1].type == 'malware'
assert bundle.objects[2].type == 'relationship'
@ -208,7 +203,7 @@ def test_parse_unknown_type():
def test_stix_object_property():
prop = stix2.core.STIXObjectProperty()
prop = stix2.v21.bundle.STIXObjectProperty()
identity = stix2.Identity(name="test", identity_class="individual")
assert prop.clean(identity) is identity

View File

@ -1,6 +1,8 @@
import pytest
import stix2
import stix2.base
import stix2.v20.sdo
from .constants import FAKE_TIME, MARKING_DEFINITION_ID
@ -93,7 +95,8 @@ def test_identity_custom_property_allowed():
def test_parse_identity_custom_property(data):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
identity = stix2.parse(data)
assert excinfo.value.cls == stix2.Identity
# TODO: update to create and check a STIX 2.1 Identity object
assert excinfo.value.cls == stix2.v20.sdo.Identity
assert excinfo.value.properties == ['foo']
assert "Unexpected properties for" in str(excinfo.value)
@ -358,8 +361,8 @@ def test_parse_custom_object_type():
"property1": "something"
}"""
nt = stix2.parse(nt_string)
assert nt.property1 == 'something'
nt = stix2.parse(nt_string, allow_custom=True)
assert nt["property1"] == 'something'
def test_parse_unregistered_custom_object_type():
@ -535,7 +538,7 @@ def test_parse_custom_observable_object():
}"""
nt = stix2.parse_observable(nt_string, [])
assert isinstance(nt, stix2.core._STIXBase)
assert isinstance(nt, stix2.base._STIXBase)
assert nt.property1 == 'something'
@ -553,7 +556,7 @@ def test_parse_unregistered_custom_observable_object():
assert parsed_custom['property1'] == 'something'
with pytest.raises(AttributeError) as excinfo:
assert parsed_custom.property1 == 'something'
assert not isinstance(parsed_custom, stix2.core._STIXBase)
assert not isinstance(parsed_custom, stix2.base._STIXBase)
def test_parse_unregistered_custom_observable_object_with_no_type():
@ -844,7 +847,7 @@ def test_parse_observable_with_unregistered_custom_extension():
parsed_ob = stix2.parse_observable(input_str, allow_custom=True)
assert parsed_ob['extensions']['x-foobar-ext']['property1'] == 'foo'
assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.core._STIXBase)
assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.base._STIXBase)
def test_register_custom_object():

View File

@ -220,7 +220,6 @@ def test_filesystem_sink_add_stix_bundle_dict(fs_sink, fs_source):
bund = {
"type": "bundle",
"id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a",
"spec_version": "2.0",
"objects": [
{
"name": "Atilla",
@ -264,7 +263,7 @@ def test_filesystem_sink_add_json_stix_object(fs_sink, fs_source):
def test_filesystem_sink_json_stix_bundle(fs_sink, fs_source):
# add json-encoded stix bundle
bund2 = '{"type": "bundle", "id": "bundle--332211b6-1132-4fb0-111b-b111107ca70a",' \
' "spec_version": "2.0", "objects": [{"type": "campaign", "id": "campaign--155155b6-1112-4fb0-111b-b111107ca70a",' \
' "objects": [{"type": "campaign", "id": "campaign--155155b6-1112-4fb0-111b-b111107ca70a",' \
' "created":"2017-05-31T21:31:53.197755Z", "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}'
fs_sink.add(bund2)
@ -348,8 +347,8 @@ def test_filesystem_store_query_single_filter(fs_store):
def test_filesystem_store_empty_query(fs_store):
results = fs_store.query() # returns all
assert len(results) == 26
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj.id for obj in results]
assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj.id for obj in results]
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj["id"] for obj in results]
assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj["id"] for obj in results]
def test_filesystem_store_query_multiple_filters(fs_store):
@ -450,8 +449,8 @@ def test_filesystem_custom_object(fs_store):
fs_store.add(newobj)
newobj_r = fs_store.get(newobj.id)
assert newobj_r.id == newobj.id
assert newobj_r.property1 == 'something'
assert newobj_r["id"] == newobj["id"]
assert newobj_r["property1"] == 'something'
# remove dir
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True)

View File

@ -9,12 +9,14 @@ stix_objs = [
"created": "2017-01-27T13:49:53.997Z",
"description": "\n\nTITLE:\n\tPoison Ivy",
"id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"spec_version": "2.1",
"labels": [
"remote-access-trojan"
],
"modified": "2017-01-27T13:49:53.997Z",
"name": "Poison Ivy",
"type": "malware"
"type": "malware",
"is_family": False
},
{
"created": "2014-05-08T09:00:00.000Z",

View File

@ -224,7 +224,6 @@ def test_add_dict_bundle_object(collection):
ta = {
"type": "bundle",
"id": "bundle--860ccc8d-56c9-4fda-9384-84276fb52fb1",
"spec_version": "2.0",
"objects": [
{
"type": "threat-actor",

View File

@ -190,13 +190,15 @@ def test_parse_malware():
env = stix2.Environment()
data = """{
"type": "malware",
"spec_version": "2.1",
"id": "malware--fedcba98-7654-3210-fedc-ba9876543210",
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"name": "Cryptolocker",
"labels": [
"ransomware"
]
],
"is_family": false
}"""
mal = env.parse(data)

View File

@ -10,6 +10,7 @@ from .constants import LOCATION_ID
EXPECTED_LOCATION_1 = """{
"type": "location",
"spec_version": "2.1",
"id": "location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64",
"created": "2016-04-06T20:03:00.000Z",
"modified": "2016-04-06T20:03:00.000Z",
@ -19,6 +20,7 @@ EXPECTED_LOCATION_1 = """{
EXPECTED_LOCATION_1_REPR = "Location(" + " ".join("""
type='location',
spec_version='2.1',
id='location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64',
created='2016-04-06T20:03:00.000Z',
modified='2016-04-06T20:03:00.000Z',
@ -27,6 +29,7 @@ EXPECTED_LOCATION_1_REPR = "Location(" + " ".join("""
EXPECTED_LOCATION_2 = """{
"type": "location",
"spec_version": "2.1",
"id": "location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64",
"created": "2016-04-06T20:03:00.000Z",
"modified": "2016-04-06T20:03:00.000Z",
@ -36,6 +39,7 @@ EXPECTED_LOCATION_2 = """{
EXPECTED_LOCATION_2_REPR = "Location(" + " ".join("""
type='location',
spec_version='2.1',
id='location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64',
created='2016-04-06T20:03:00.000Z',
modified='2016-04-06T20:03:00.000Z',
@ -63,6 +67,7 @@ def test_location_with_some_required_properties():
EXPECTED_LOCATION_2,
{
"type": "location",
"spec_version": "2.1",
"id": "location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64",
"created": "2016-04-06T20:03:00.000Z",
"modified": "2016-04-06T20:03:00.000Z",

View File

@ -10,13 +10,15 @@ from .constants import FAKE_TIME, MALWARE_ID, MALWARE_KWARGS
EXPECTED_MALWARE = """{
"type": "malware",
"spec_version": "2.1",
"id": "malware--fedcba98-7654-3210-fedc-ba9876543210",
"created": "2016-05-12T08:17:27.000Z",
"modified": "2016-05-12T08:17:27.000Z",
"name": "Cryptolocker",
"labels": [
"ransomware"
]
],
"is_family": false
}"""
@ -30,6 +32,7 @@ def test_malware_with_all_required_properties():
modified=now,
labels=["ransomware"],
name="Cryptolocker",
is_family=False
)
assert str(mal) == EXPECTED_MALWARE
@ -76,12 +79,12 @@ def test_malware_required_properties():
stix2.Malware()
assert excinfo.value.cls == stix2.Malware
assert excinfo.value.properties == ["labels", "name"]
assert excinfo.value.properties == ["is_family", "labels", "name"]
def test_malware_required_property_name():
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
stix2.Malware(labels=['ransomware'])
stix2.Malware(labels=['ransomware'], is_family=False)
assert excinfo.value.cls == stix2.Malware
assert excinfo.value.properties == ["name"]
@ -107,11 +110,13 @@ def test_invalid_kwarg_to_malware():
EXPECTED_MALWARE,
{
"type": "malware",
"spec_version": "2.1",
"id": "malware--fedcba98-7654-3210-fedc-ba9876543210",
"created": "2016-05-12T08:17:27.000Z",
"modified": "2016-05-12T08:17:27.000Z",
"labels": ["ransomware"],
"name": "Cryptolocker",
"is_family": False
},
])
def test_parse_malware(data):
@ -156,6 +161,6 @@ def test_parse_malware_clean_kill_chain_phases():
"phase_name": 1
}
]"""
data = EXPECTED_MALWARE.replace('malware"', 'malware",%s' % kill_chain)
data = EXPECTED_MALWARE.replace('2.1"', '2.1",%s' % kill_chain)
mal = stix2.parse(data)
assert mal['kill_chain_phases'][0]['phase_name'] == "1"

View File

@ -15,6 +15,7 @@ DESCRIPTION = ('This note indicates the various steps taken by the threat'
EXPECTED_NOTE = """{
"type": "note",
"spec_version": "2.1",
"id": "note--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
"created": "2016-05-12T08:17:27.000Z",
"modified": "2016-05-12T08:17:27.000Z",
@ -36,6 +37,7 @@ EXPECTED_NOTE = """{
EXPECTED_OPINION_REPR = "Note(" + " ".join(("""
type='note',
spec_version='2.1',
id='note--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061',
created='2016-05-12T08:17:27.000Z',
modified='2016-05-12T08:17:27.000Z',
@ -76,6 +78,7 @@ def test_note_with_required_properties():
EXPECTED_NOTE,
{
"type": "note",
"spec_version": "2.1",
"id": "note--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
"created": "2016-05-12T08:17:27.000Z",
"modified": "2016-05-12T08:17:27.000Z",

View File

@ -16,6 +16,7 @@ DESCRIPTION = ('This doesn\'t seem like it is feasible. We\'ve seen how '
EXPECTED_OPINION = """{
"type": "opinion",
"spec_version": "2.1",
"id": "opinion--b01efc25-77b4-4003-b18b-f6e24b5cd9f7",
"created": "2016-05-12T08:17:27.000Z",
"modified": "2016-05-12T08:17:27.000Z",
@ -28,6 +29,7 @@ EXPECTED_OPINION = """{
EXPECTED_OPINION_REPR = "Opinion(" + " ".join(("""
type='opinion',
spec_version='2.1',
id='opinion--b01efc25-77b4-4003-b18b-f6e24b5cd9f7',
created='2016-05-12T08:17:27.000Z',
modified='2016-05-12T08:17:27.000Z',
@ -58,6 +60,7 @@ def test_opinion_with_required_properties():
EXPECTED_OPINION,
{
"type": "opinion",
"spec_version": "2.1",
"id": "opinion--b01efc25-77b4-4003-b18b-f6e24b5cd9f7",
"created": "2016-05-12T08:17:27.000Z",
"modified": "2016-05-12T08:17:27.000Z",

View File

@ -10,6 +10,7 @@ from .constants import (FAKE_TIME, INDICATOR_ID, MALWARE_ID, RELATIONSHIP_ID,
EXPECTED_RELATIONSHIP = """{
"type": "relationship",
"spec_version": "2.1",
"id": "relationship--00000000-1111-2222-3333-444444444444",
"created": "2016-04-06T20:06:37.000Z",
"modified": "2016-04-06T20:06:37.000Z",

View File

@ -217,6 +217,7 @@ def test_revoke_invalid_cls():
def test_remove_custom_stix_property():
mal = stix2.Malware(name="ColePowers",
labels=["rootkit"],
is_family=False,
x_custom="armada",
allow_custom=True)

View File

@ -1,7 +1,8 @@
import os
import stix2
from stix2.workbench import (AttackPattern, Bundle, Campaign, CourseOfAction,
from stix2 import Bundle
from stix2.workbench import (AttackPattern, Campaign, CourseOfAction,
ExternalReference, FileSystemSource, Filter,
Identity, Indicator, IntrusionSet, Malware,
MarkingDefinition, ObservedData, Relationship,
@ -190,7 +191,8 @@ def test_workbench_related():
def test_workbench_related_with_filters():
malware = Malware(labels=["ransomware"], name="CryptorBit", created_by_ref=IDENTITY_ID)
malware = Malware(labels=["ransomware"], name="CryptorBit", created_by_ref=IDENTITY_ID,
is_family=False)
rel = Relationship(malware.id, 'variant-of', MALWARE_ID)
save([malware, rel])

View File

@ -1,7 +1,7 @@
# flake8: noqa
from ..core import Bundle
from .bundle import Bundle
from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking,
ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking)

77
stix2/v20/bundle.py Normal file
View File

@ -0,0 +1,77 @@
from collections import OrderedDict
from stix2 import parse
from stix2.base import _STIXBase
from stix2.properties import (IDProperty, ListProperty, Property,
StringProperty, TypeProperty)
from stix2.utils import _get_dict, get_class_hierarchy_names
class STIXObjectProperty(Property):
def __init__(self, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
super(STIXObjectProperty, self).__init__(*args, **kwargs)
def clean(self, value):
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
# a bundle with no further checks.
if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition')
for x in get_class_hierarchy_names(value)):
# A simple "is this a spec version 2.1+ object" test. For now,
# limit 2.0 bundles to 2.0 objects. It's not possible yet to
# have validation co-constraints among properties, e.g. have
# validation here depend on the value of another property
# (spec_version). So this is a hack, and not technically spec-
# compliant.
if "spec_version" in value:
raise ValueError("Spec version 2.0 bundles don't yet support "
"containing objects of a different spec "
"version.")
return value
try:
dictified = _get_dict(value)
except ValueError:
raise ValueError("This property may only contain a dictionary or object")
if dictified == {}:
raise ValueError("This property may only contain a non-empty dictionary or object")
if 'type' in dictified and dictified['type'] == 'bundle':
raise ValueError('This property may not contain a Bundle object')
if "spec_version" in dictified:
# See above comment regarding spec_version.
raise ValueError("Spec version 2.0 bundles don't yet support "
"containing objects of a different spec version.")
parsed_obj = parse(dictified, allow_custom=self.allow_custom)
return parsed_obj
class Bundle(_STIXBase):
"""For more detailed information on this object's properties, see
`the STIX 2.0 specification <http://docs.oasis-open.org/cti/stix/v2.0/cs01/part1-stix-core/stix-v2.0-cs01-part1-stix-core.html#_Toc496709293>`__.
"""
_type = 'bundle'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
# Not technically correct: STIX 2.0 spec doesn't say spec_version must
# have this value, but it's all we support for now.
('spec_version', StringProperty(fixed="2.0")),
('objects', ListProperty(STIXObjectProperty)),
])
def __init__(self, *args, **kwargs):
# Add any positional arguments to the 'objects' kwarg.
if args:
if isinstance(args[0], list):
kwargs['objects'] = args[0] + list(args[1:]) + kwargs.get('objects', [])
else:
kwargs['objects'] = list(args) + kwargs.get('objects', [])
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].contained.allow_custom = kwargs.get('allow_custom', False)
super(Bundle, self).__init__(**kwargs)

View File

@ -1,7 +1,7 @@
# flake8: noqa
from ..core import Bundle
from .bundle import Bundle
from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking,
ExternalReference, GranularMarking, KillChainPhase,
LanguageContent, MarkingDefinition, StatementMarking,

60
stix2/v21/bundle.py Normal file
View File

@ -0,0 +1,60 @@
from collections import OrderedDict
from stix2 import parse
from stix2.base import _STIXBase
from stix2.properties import IDProperty, ListProperty, Property, TypeProperty
from stix2.utils import _get_dict, get_class_hierarchy_names
class STIXObjectProperty(Property):
def __init__(self, allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
super(STIXObjectProperty, self).__init__(*args, **kwargs)
def clean(self, value):
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
# a bundle with no further checks.
if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition')
for x in get_class_hierarchy_names(value)):
return value
try:
dictified = _get_dict(value)
except ValueError:
raise ValueError("This property may only contain a dictionary or object")
if dictified == {}:
raise ValueError("This property may only contain a non-empty dictionary or object")
if 'type' in dictified and dictified['type'] == 'bundle':
raise ValueError('This property may not contain a Bundle object')
parsed_obj = parse(dictified, allow_custom=self.allow_custom)
return parsed_obj
class Bundle(_STIXBase):
"""For more detailed information on this object's properties, see
TODO: Update this to a STIX 2.1 link.
`the STIX 2.0 specification <http://docs.oasis-open.org/cti/stix/v2.0/cs01/part1-stix-core/stix-v2.0-cs01-part1-stix-core.html#_Toc496709293>`__.
"""
_type = 'bundle'
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('objects', ListProperty(STIXObjectProperty)),
])
def __init__(self, *args, **kwargs):
# Add any positional arguments to the 'objects' kwarg.
if args:
if isinstance(args[0], list):
kwargs['objects'] = args[0] + list(args[1:]) + kwargs.get('objects', [])
else:
kwargs['objects'] = list(args) + kwargs.get('objects', [])
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].contained.allow_custom = kwargs.get('allow_custom', False)
super(Bundle, self).__init__(**kwargs)

View File

@ -7,7 +7,8 @@ import stix2
from ..base import _STIXBase
from ..markings import _MarkingsMixin
from ..properties import (BooleanProperty, EnumProperty, FloatProperty,
from ..properties import (BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, FloatProperty,
IDProperty, IntegerProperty, ListProperty,
PatternProperty, ReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
@ -205,6 +206,7 @@ class Location(STIXDomainObject):
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
@ -229,6 +231,32 @@ class Location(STIXDomainObject):
])
class AnalysisType(_STIXBase):
_properties = OrderedDict()
_properties.update([
('start_time', TimestampProperty()),
('end_time', TimestampProperty()),
('analysis_tools', ObservableProperty()),
('analysis_environment', DictionaryProperty()),
('results', DictionaryProperty(required=True))
])
class AVResultsType(_STIXBase):
_properties = OrderedDict()
_properties.update([
('product', StringProperty()),
('engine_version', StringProperty()),
('definition_version', StringProperty()),
('submitted', TimestampProperty()),
('scanned', TimestampProperty()),
('result', StringProperty()),
('details', StringProperty())
])
class Malware(STIXDomainObject):
# TODO: Add link
"""For more detailed information on this object's properties, see
@ -239,6 +267,7 @@ class Malware(STIXDomainObject):
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
@ -253,6 +282,17 @@ class Malware(STIXDomainObject):
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
('is_family', BooleanProperty(required=True)),
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
('os_execution_envs', ListProperty(StringProperty)),
('architecture_execution_envs', ListProperty(StringProperty)),
('implementation_languages', ListProperty(StringProperty)),
('samples', ObservableProperty()),
('static_analysis_results', ListProperty(EmbeddedObjectProperty(AnalysisType))),
('dynamic_analysis_results', ListProperty(EmbeddedObjectProperty(AnalysisType))),
('av_results', ListProperty(EmbeddedObjectProperty(AVResultsType))),
('capabilities', ListProperty(StringProperty))
])
@ -266,6 +306,7 @@ class Note(STIXDomainObject):
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
@ -328,6 +369,7 @@ class Opinion(STIXDomainObject):
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),

View File

@ -25,6 +25,7 @@ class Relationship(STIXRelationshipObject):
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('spec_version', StringProperty(fixed="2.1")),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),