Merge pull request #22 from oasis-open/custom
Custom properties, object types, and cyber observablesstix2.1
commit
07ccf9ec03
|
@ -95,46 +95,70 @@ EXT_MAP = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def parse(data):
|
def parse(data, allow_custom=False):
|
||||||
"""Deserialize a string or file-like object into a STIX object"""
|
"""Deserialize a string or file-like object into a STIX object.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: The STIX 2 string to be parsed.
|
||||||
|
allow_custom (bool): Whether to allow custom properties or not. Default: False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
An instantiated Python STIX object.
|
||||||
|
"""
|
||||||
|
|
||||||
obj = get_dict(data)
|
obj = get_dict(data)
|
||||||
|
|
||||||
if 'type' not in obj:
|
if 'type' not in obj:
|
||||||
# TODO parse external references, kill chain phases, and granular markings
|
raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj))
|
||||||
pass
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
obj_class = OBJ_MAP[obj['type']]
|
|
||||||
except KeyError:
|
|
||||||
# TODO handle custom objects
|
|
||||||
raise ValueError("Can't parse unknown object type '%s'!" % obj['type'])
|
|
||||||
return obj_class(**obj)
|
|
||||||
|
|
||||||
return obj
|
try:
|
||||||
|
obj_class = OBJ_MAP[obj['type']]
|
||||||
|
except KeyError:
|
||||||
|
raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj['type'])
|
||||||
|
return obj_class(allow_custom=allow_custom, **obj)
|
||||||
|
|
||||||
|
|
||||||
def parse_observable(data, _valid_refs):
|
def parse_observable(data, _valid_refs=[], allow_custom=False):
|
||||||
"""Deserialize a string or file-like object into a STIX Cyber Observable
|
"""Deserialize a string or file-like object into a STIX Cyber Observable object.
|
||||||
object.
|
|
||||||
|
Args:
|
||||||
|
data: The STIX 2 string to be parsed.
|
||||||
|
_valid_refs: A list of object references valid for the scope of the object being parsed.
|
||||||
|
allow_custom: Whether to allow custom properties or not. Default: False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
An instantiated Python STIX Cyber Observable object.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
obj = get_dict(data)
|
obj = get_dict(data)
|
||||||
obj['_valid_refs'] = _valid_refs
|
obj['_valid_refs'] = _valid_refs
|
||||||
|
|
||||||
if 'type' not in obj:
|
if 'type' not in obj:
|
||||||
raise ValueError("'type' is a required property!")
|
raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj))
|
||||||
try:
|
try:
|
||||||
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
|
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# TODO handle custom observable objects
|
raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type'])
|
||||||
raise ValueError("Can't parse unknown object type '%s'!" % obj['type'])
|
|
||||||
|
|
||||||
if 'extensions' in obj and obj['type'] in EXT_MAP:
|
if 'extensions' in obj and obj['type'] in EXT_MAP:
|
||||||
for name, ext in obj['extensions'].items():
|
for name, ext in obj['extensions'].items():
|
||||||
if name not in EXT_MAP[obj['type']]:
|
if name not in EXT_MAP[obj['type']]:
|
||||||
raise ValueError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
|
raise exceptions.ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
|
||||||
ext_class = EXT_MAP[obj['type']][name]
|
ext_class = EXT_MAP[obj['type']][name]
|
||||||
obj['extensions'][name] = ext_class(**obj['extensions'][name])
|
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
|
||||||
|
|
||||||
return obj_class(**obj)
|
return obj_class(allow_custom=allow_custom, **obj)
|
||||||
|
|
||||||
|
|
||||||
|
def _register_type(new_type):
|
||||||
|
"""Register a custom STIX Object type.
|
||||||
|
"""
|
||||||
|
|
||||||
|
OBJ_MAP[new_type._type] = new_type
|
||||||
|
|
||||||
|
|
||||||
|
def _register_observable(new_observable):
|
||||||
|
"""Register a custom STIX Cyber Observable type.
|
||||||
|
"""
|
||||||
|
|
||||||
|
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
|
||||||
|
|
|
@ -83,20 +83,26 @@ class _STIXBase(collections.Mapping):
|
||||||
# TODO: check selectors
|
# TODO: check selectors
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, allow_custom=False, **kwargs):
|
||||||
cls = self.__class__
|
cls = self.__class__
|
||||||
|
|
||||||
# Use the same timestamp for any auto-generated datetimes
|
# Use the same timestamp for any auto-generated datetimes
|
||||||
self.__now = get_timestamp()
|
self.__now = get_timestamp()
|
||||||
|
|
||||||
# Detect any keyword arguments not allowed for a specific type
|
# Detect any keyword arguments not allowed for a specific type
|
||||||
extra_kwargs = list(set(kwargs) - set(cls._properties))
|
custom_props = kwargs.pop('custom_properties', {})
|
||||||
if extra_kwargs:
|
if custom_props and not isinstance(custom_props, dict):
|
||||||
raise ExtraPropertiesError(cls, extra_kwargs)
|
raise ValueError("'custom_properties' must be a dictionary")
|
||||||
|
if not allow_custom:
|
||||||
|
extra_kwargs = list(set(kwargs) - set(cls._properties))
|
||||||
|
if extra_kwargs:
|
||||||
|
raise ExtraPropertiesError(cls, extra_kwargs)
|
||||||
|
|
||||||
# Remove any keyword arguments whose value is None
|
# Remove any keyword arguments whose value is None
|
||||||
setting_kwargs = {}
|
setting_kwargs = {}
|
||||||
for prop_name, prop_value in kwargs.items():
|
props = kwargs.copy()
|
||||||
|
props.update(custom_props)
|
||||||
|
for prop_name, prop_value in props.items():
|
||||||
if prop_value is not None:
|
if prop_value is not None:
|
||||||
setting_kwargs[prop_name] = prop_value
|
setting_kwargs[prop_name] = prop_value
|
||||||
|
|
||||||
|
|
|
@ -150,3 +150,10 @@ class RevokeError(STIXError, ValueError):
|
||||||
return "Cannot revoke an already revoked object."
|
return "Cannot revoke an already revoked object."
|
||||||
else:
|
else:
|
||||||
return "Cannot create a new version of a revoked object."
|
return "Cannot create a new version of a revoked object."
|
||||||
|
|
||||||
|
|
||||||
|
class ParseError(STIXError, ValueError):
|
||||||
|
"""Could not parse object"""
|
||||||
|
|
||||||
|
def __init__(self, msg):
|
||||||
|
super(ParseError, self).__init__(msg)
|
||||||
|
|
|
@ -5,6 +5,8 @@ embedded in Email Message objects, inherit from _STIXBase instead of Observable
|
||||||
and do not have a '_type' attribute.
|
and do not have a '_type' attribute.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import stix2
|
||||||
|
|
||||||
from .base import _Extension, _Observable, _STIXBase
|
from .base import _Extension, _Observable, _STIXBase
|
||||||
from .exceptions import AtLeastOnePropertyError, DependentPropertiesError
|
from .exceptions import AtLeastOnePropertyError, DependentPropertiesError
|
||||||
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
|
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
|
||||||
|
@ -586,3 +588,27 @@ class X509Certificate(_Observable):
|
||||||
'subject_public_key_exponent': IntegerProperty(),
|
'subject_public_key_exponent': IntegerProperty(),
|
||||||
'x509_v3_extensions': EmbeddedObjectProperty(type=X509V3ExtenstionsType),
|
'x509_v3_extensions': EmbeddedObjectProperty(type=X509V3ExtenstionsType),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def CustomObservable(type='x-custom-observable', properties={}):
|
||||||
|
"""Custom STIX Cyber Observable type decorator
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def custom_builder(cls):
|
||||||
|
|
||||||
|
class _Custom(cls, _Observable):
|
||||||
|
_type = type
|
||||||
|
_properties = {
|
||||||
|
'type': TypeProperty(_type),
|
||||||
|
}
|
||||||
|
_properties.update(properties)
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
_Observable.__init__(self, **kwargs)
|
||||||
|
cls.__init__(self, **kwargs)
|
||||||
|
|
||||||
|
stix2._register_observable(_Custom)
|
||||||
|
return _Custom
|
||||||
|
|
||||||
|
return custom_builder
|
||||||
|
|
50
stix2/sdo.py
50
stix2/sdo.py
|
@ -1,5 +1,7 @@
|
||||||
"""STIX 2.0 Domain Objects"""
|
"""STIX 2.0 Domain Objects"""
|
||||||
|
|
||||||
|
import stix2
|
||||||
|
|
||||||
from .base import _STIXBase
|
from .base import _STIXBase
|
||||||
from .common import COMMON_PROPERTIES
|
from .common import COMMON_PROPERTIES
|
||||||
from .other import KillChainPhase
|
from .other import KillChainPhase
|
||||||
|
@ -190,3 +192,51 @@ class Vulnerability(_STIXBase):
|
||||||
'name': StringProperty(required=True),
|
'name': StringProperty(required=True),
|
||||||
'description': StringProperty(),
|
'description': StringProperty(),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def CustomObject(type='x-custom-type', properties={}):
|
||||||
|
"""Custom STIX Object type decorator
|
||||||
|
|
||||||
|
Example 1:
|
||||||
|
|
||||||
|
@CustomObject('x-type-name', {
|
||||||
|
'property1': StringProperty(required=True),
|
||||||
|
'property2': IntegerProperty(),
|
||||||
|
})
|
||||||
|
class MyNewObjectType():
|
||||||
|
pass
|
||||||
|
|
||||||
|
Supply an __init__() function to add any special validations to the custom
|
||||||
|
type. Don't call super().__init() though - doing so will cause an error.
|
||||||
|
|
||||||
|
Example 2:
|
||||||
|
|
||||||
|
@CustomObject('x-type-name', {
|
||||||
|
'property1': StringProperty(required=True),
|
||||||
|
'property2': IntegerProperty(),
|
||||||
|
})
|
||||||
|
class MyNewObjectType():
|
||||||
|
def __init__(self, property2=None, **kwargs):
|
||||||
|
if property2 and property2 < 10:
|
||||||
|
raise ValueError("'property2' is too small.")
|
||||||
|
"""
|
||||||
|
|
||||||
|
def custom_builder(cls):
|
||||||
|
|
||||||
|
class _Custom(cls, _STIXBase):
|
||||||
|
_type = type
|
||||||
|
_properties = COMMON_PROPERTIES.copy()
|
||||||
|
_properties.update({
|
||||||
|
'id': IDProperty(_type),
|
||||||
|
'type': TypeProperty(_type),
|
||||||
|
})
|
||||||
|
_properties.update(properties)
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
_STIXBase.__init__(self, **kwargs)
|
||||||
|
cls.__init__(self, **kwargs)
|
||||||
|
|
||||||
|
stix2._register_type(_Custom)
|
||||||
|
return _Custom
|
||||||
|
|
||||||
|
return custom_builder
|
||||||
|
|
|
@ -0,0 +1,168 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import stix2
|
||||||
|
|
||||||
|
|
||||||
|
def test_identity_custom_property():
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
stix2.Identity(
|
||||||
|
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
||||||
|
created="2015-12-21T19:59:11Z",
|
||||||
|
modified="2015-12-21T19:59:11Z",
|
||||||
|
name="John Smith",
|
||||||
|
identity_class="individual",
|
||||||
|
custom_properties="foobar",
|
||||||
|
)
|
||||||
|
|
||||||
|
identity = stix2.Identity(
|
||||||
|
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
||||||
|
created="2015-12-21T19:59:11Z",
|
||||||
|
modified="2015-12-21T19:59:11Z",
|
||||||
|
name="John Smith",
|
||||||
|
identity_class="individual",
|
||||||
|
custom_properties={
|
||||||
|
"foo": "bar",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert identity.foo == "bar"
|
||||||
|
|
||||||
|
|
||||||
|
def test_identity_custom_property_invalid():
|
||||||
|
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
|
||||||
|
stix2.Identity(
|
||||||
|
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
||||||
|
created="2015-12-21T19:59:11Z",
|
||||||
|
modified="2015-12-21T19:59:11Z",
|
||||||
|
name="John Smith",
|
||||||
|
identity_class="individual",
|
||||||
|
x_foo="bar",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_identity_custom_property_allowed():
|
||||||
|
identity = stix2.Identity(
|
||||||
|
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
||||||
|
created="2015-12-21T19:59:11Z",
|
||||||
|
modified="2015-12-21T19:59:11Z",
|
||||||
|
name="John Smith",
|
||||||
|
identity_class="individual",
|
||||||
|
x_foo="bar",
|
||||||
|
allow_custom=True,
|
||||||
|
)
|
||||||
|
assert identity.x_foo == "bar"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("data", [
|
||||||
|
"""{
|
||||||
|
"type": "identity",
|
||||||
|
"id": "identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
||||||
|
"created": "2015-12-21T19:59:11Z",
|
||||||
|
"modified": "2015-12-21T19:59:11Z",
|
||||||
|
"name": "John Smith",
|
||||||
|
"identity_class": "individual",
|
||||||
|
"foo": "bar"
|
||||||
|
}""",
|
||||||
|
])
|
||||||
|
def test_parse_identity_custom_property(data):
|
||||||
|
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
|
||||||
|
identity = stix2.parse(data)
|
||||||
|
|
||||||
|
identity = stix2.parse(data, allow_custom=True)
|
||||||
|
assert identity.foo == "bar"
|
||||||
|
|
||||||
|
|
||||||
|
@stix2.sdo.CustomObject('x-new-type', {
|
||||||
|
'property1': stix2.properties.StringProperty(required=True),
|
||||||
|
'property2': stix2.properties.IntegerProperty(),
|
||||||
|
})
|
||||||
|
class NewType():
|
||||||
|
def __init__(self, property2=None, **kwargs):
|
||||||
|
if property2 and property2 < 10:
|
||||||
|
raise ValueError("'property2' is too small.")
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_object_type():
|
||||||
|
nt = NewType(property1='something')
|
||||||
|
assert nt.property1 == 'something'
|
||||||
|
|
||||||
|
with pytest.raises(stix2.exceptions.MissingPropertiesError):
|
||||||
|
NewType(property2=42)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
NewType(property1='something', property2=4)
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_custom_object_type():
|
||||||
|
nt_string = """{
|
||||||
|
"type": "x-new-type",
|
||||||
|
"created": "2015-12-21T19:59:11Z",
|
||||||
|
"property1": "something"
|
||||||
|
}"""
|
||||||
|
|
||||||
|
nt = stix2.parse(nt_string)
|
||||||
|
assert nt.property1 == 'something'
|
||||||
|
|
||||||
|
|
||||||
|
@stix2.observables.CustomObservable('x-new-observable', {
|
||||||
|
'property1': stix2.properties.StringProperty(required=True),
|
||||||
|
'property2': stix2.properties.IntegerProperty(),
|
||||||
|
})
|
||||||
|
class NewObservable():
|
||||||
|
def __init__(self, property2=None, **kwargs):
|
||||||
|
if property2 and property2 < 10:
|
||||||
|
raise ValueError("'property2' is too small.")
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_observable_object():
|
||||||
|
no = NewObservable(property1='something')
|
||||||
|
assert no.property1 == 'something'
|
||||||
|
|
||||||
|
with pytest.raises(stix2.exceptions.MissingPropertiesError):
|
||||||
|
NewObservable(property2=42)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
NewObservable(property1='something', property2=4)
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_custom_observable_object():
|
||||||
|
nt_string = """{
|
||||||
|
"type": "x-new-observable",
|
||||||
|
"property1": "something"
|
||||||
|
}"""
|
||||||
|
|
||||||
|
nt = stix2.parse_observable(nt_string)
|
||||||
|
assert nt.property1 == 'something'
|
||||||
|
|
||||||
|
|
||||||
|
def test_observable_custom_property():
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
NewObservable(
|
||||||
|
property1='something',
|
||||||
|
custom_properties="foobar",
|
||||||
|
)
|
||||||
|
|
||||||
|
no = NewObservable(
|
||||||
|
property1='something',
|
||||||
|
custom_properties={
|
||||||
|
"foo": "bar",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert no.foo == "bar"
|
||||||
|
|
||||||
|
|
||||||
|
def test_observable_custom_property_invalid():
|
||||||
|
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
|
||||||
|
NewObservable(
|
||||||
|
property1='something',
|
||||||
|
x_foo="bar",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_observable_custom_property_allowed():
|
||||||
|
no = NewObservable(
|
||||||
|
property1='something',
|
||||||
|
x_foo="bar",
|
||||||
|
allow_custom=True,
|
||||||
|
)
|
||||||
|
assert no.x_foo == "bar"
|
|
@ -19,7 +19,7 @@ EXPECTED = """{
|
||||||
|
|
||||||
|
|
||||||
def test_identity_example():
|
def test_identity_example():
|
||||||
report = stix2.Identity(
|
identity = stix2.Identity(
|
||||||
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
||||||
created="2015-12-21T19:59:11Z",
|
created="2015-12-21T19:59:11Z",
|
||||||
modified="2015-12-21T19:59:11Z",
|
modified="2015-12-21T19:59:11Z",
|
||||||
|
@ -27,7 +27,7 @@ def test_identity_example():
|
||||||
identity_class="individual",
|
identity_class="individual",
|
||||||
)
|
)
|
||||||
|
|
||||||
assert str(report) == EXPECTED
|
assert str(identity) == EXPECTED
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("data", [
|
@pytest.mark.parametrize("data", [
|
||||||
|
@ -50,4 +50,16 @@ def test_parse_identity(data):
|
||||||
assert identity.modified == dt.datetime(2015, 12, 21, 19, 59, 11, tzinfo=pytz.utc)
|
assert identity.modified == dt.datetime(2015, 12, 21, 19, 59, 11, tzinfo=pytz.utc)
|
||||||
assert identity.name == "John Smith"
|
assert identity.name == "John Smith"
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_no_type():
|
||||||
|
with pytest.raises(stix2.exceptions.ParseError):
|
||||||
|
stix2.parse("""
|
||||||
|
{
|
||||||
|
"id": "identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
|
||||||
|
"created": "2015-12-21T19:59:11Z",
|
||||||
|
"modified": "2015-12-21T19:59:11Z",
|
||||||
|
"name": "John Smith",
|
||||||
|
"identity_class": "individual"
|
||||||
|
}""")
|
||||||
|
|
||||||
# TODO: Add other examples
|
# TODO: Add other examples
|
||||||
|
|
Loading…
Reference in New Issue