2018-11-28 22:51:00 +01:00
|
|
|
"""Classes for representing properties of STIX Objects and Cyber Observables."""
|
|
|
|
|
2018-07-10 20:50:03 +02:00
|
|
|
import base64
|
|
|
|
import binascii
|
|
|
|
import collections
|
|
|
|
import copy
|
|
|
|
import inspect
|
|
|
|
import re
|
|
|
|
import uuid
|
|
|
|
|
|
|
|
from six import string_types, text_type
|
|
|
|
from stix2patterns.validator import run_validator
|
|
|
|
|
2019-06-21 20:25:36 +02:00
|
|
|
import stix2
|
|
|
|
|
2019-01-09 16:46:48 +01:00
|
|
|
from .base import _Observable, _STIXBase
|
2018-07-10 20:50:03 +02:00
|
|
|
from .core import STIX2_OBJ_MAPS, parse, parse_observable
|
2019-08-29 23:15:51 +02:00
|
|
|
from .exceptions import (
|
|
|
|
CustomContentError, DictionaryKeyError, MissingPropertiesError,
|
|
|
|
MutuallyExclusivePropertiesError,
|
|
|
|
)
|
2018-07-10 20:50:03 +02:00
|
|
|
from .utils import _get_dict, get_class_hierarchy_names, parse_into_datetime
|
|
|
|
|
2018-07-05 18:25:48 +02:00
|
|
|
ERROR_INVALID_ID = (
|
2019-06-13 02:19:47 +02:00
|
|
|
"not a valid STIX identifier, must match <object-type>--<UUID>: {}"
|
2018-07-05 18:25:48 +02:00
|
|
|
)
|
|
|
|
|
2018-07-10 20:50:03 +02:00
|
|
|
|
2019-06-13 02:19:47 +02:00
|
|
|
def _check_uuid(uuid_str, spec_version):
|
|
|
|
"""
|
|
|
|
Check whether the given UUID string is valid with respect to the given STIX
|
|
|
|
spec version. STIX 2.0 requires UUIDv4; 2.1 only requires the RFC 4122
|
|
|
|
variant.
|
|
|
|
|
|
|
|
:param uuid_str: A UUID as a string
|
|
|
|
:param spec_version: The STIX spec version
|
|
|
|
:return: True if the UUID is valid, False if not
|
|
|
|
:raises ValueError: If uuid_str is malformed
|
|
|
|
"""
|
|
|
|
uuid_obj = uuid.UUID(uuid_str)
|
|
|
|
|
|
|
|
ok = uuid_obj.variant == uuid.RFC_4122
|
|
|
|
if ok and spec_version == "2.0":
|
|
|
|
ok = uuid_obj.version == 4
|
|
|
|
|
|
|
|
return ok
|
|
|
|
|
|
|
|
|
2019-06-14 00:37:21 +02:00
|
|
|
def _validate_id(id_, spec_version, required_prefix):
|
|
|
|
"""
|
|
|
|
Check the STIX identifier for correctness, raise an exception if there are
|
|
|
|
errors.
|
|
|
|
|
|
|
|
:param id_: The STIX identifier
|
|
|
|
:param spec_version: The STIX specification version to use
|
|
|
|
:param required_prefix: The required prefix on the identifier, if any.
|
|
|
|
This function doesn't add a "--" suffix to the prefix, so callers must
|
|
|
|
add it if it is important. Pass None to skip the prefix check.
|
|
|
|
:raises ValueError: If there are any errors with the identifier
|
|
|
|
"""
|
|
|
|
if required_prefix:
|
|
|
|
if not id_.startswith(required_prefix):
|
|
|
|
raise ValueError("must start with '{}'.".format(required_prefix))
|
|
|
|
|
|
|
|
try:
|
|
|
|
if required_prefix:
|
|
|
|
uuid_part = id_[len(required_prefix):]
|
|
|
|
else:
|
|
|
|
idx = id_.index("--")
|
|
|
|
uuid_part = id_[idx+2:]
|
|
|
|
|
|
|
|
result = _check_uuid(uuid_part, spec_version)
|
|
|
|
except ValueError:
|
|
|
|
# replace their ValueError with ours
|
|
|
|
raise ValueError(ERROR_INVALID_ID.format(id_))
|
|
|
|
|
|
|
|
if not result:
|
|
|
|
raise ValueError(ERROR_INVALID_ID.format(id_))
|
|
|
|
|
|
|
|
|
2018-07-10 20:50:03 +02:00
|
|
|
class Property(object):
|
|
|
|
"""Represent a property of STIX data type.
|
|
|
|
|
|
|
|
Subclasses can define the following attributes as keyword arguments to
|
|
|
|
``__init__()``.
|
|
|
|
|
|
|
|
Args:
|
2018-07-12 20:31:14 +02:00
|
|
|
required (bool): If ``True``, the property must be provided when
|
|
|
|
creating an object with that property. No default value exists for
|
|
|
|
these properties. (Default: ``False``)
|
2018-07-10 20:50:03 +02:00
|
|
|
fixed: This provides a constant default value. Users are free to
|
2018-07-12 20:31:14 +02:00
|
|
|
provide this value explicity when constructing an object (which
|
|
|
|
allows you to copy **all** values from an existing object to a new
|
|
|
|
object), but if the user provides a value other than the ``fixed``
|
|
|
|
value, it will raise an error. This is semantically equivalent to
|
|
|
|
defining both:
|
2018-07-10 20:50:03 +02:00
|
|
|
|
|
|
|
- a ``clean()`` function that checks if the value matches the fixed
|
|
|
|
value, and
|
|
|
|
- a ``default()`` function that returns the fixed value.
|
|
|
|
|
|
|
|
Subclasses can also define the following functions:
|
|
|
|
|
|
|
|
- ``def clean(self, value) -> any:``
|
|
|
|
- Return a value that is valid for this property. If ``value`` is not
|
|
|
|
valid for this property, this will attempt to transform it first. If
|
2018-07-12 20:31:14 +02:00
|
|
|
``value`` is not valid and no such transformation is possible, it
|
Improved the exception class hierarchy:
- Removed all plain python base classes (e.g. ValueError, TypeError)
- Renamed InvalidPropertyConfigurationError -> PropertyPresenceError,
since incorrect values could be considered a property config error, and
I really just wanted this class to apply to presence (co-)constraint
violations.
- Added ObjectConfigurationError as a superclass of InvalidValueError,
PropertyPresenceError, and any other exception that could be raised
during _STIXBase object init, which is when the spec compliance
checks happen. This class is intended to represent general spec
violations.
- Did some class reordering in exceptions.py, so all the
ObjectConfigurationError subclasses were together.
Changed how property "cleaning" errors were handled:
- Previous docs said they should all be ValueErrors, but that would require
extra exception check-and-replace complexity in the property
implementations, so that requirement is removed. Doc is changed to just
say that cleaning problems should cause exceptions to be raised.
_STIXBase._check_property() now handles most exception types, not just
ValueError.
- Decided to try chaining the original clean error to the InvalidValueError,
in case the extra diagnostics would be helpful in the future. This is
done via 'six' adapter function and only works on python3.
- A small amount of testing was removed, since it was looking at custom
exception properties which became unavailable once the exception was
replaced with InvalidValueError.
Did another pass through unit tests to fix breakage caused by the changed
exception class hierarchy.
Removed unnecessary observable extension handling code from
parse_observable(), since it was all duplicated in ExtensionsProperty.
The redundant code in parse_observable() had different exception behavior
than ExtensionsProperty, which makes the API inconsistent and unit tests
more complicated. (Problems in ExtensionsProperty get replaced with
InvalidValueError, but extensions problems handled directly in
parse_observable() don't get the same replacement, and so the exception
type is different.)
Redid the workbench monkeypatching. The old way was impossible to make
work, and had caused ugly ripple effect hackage in other parts of the
codebase. Now, it replaces the global object maps with factory functions
which behave the same way when called, as real classes. Had to fix up a
few unit tests to get them all passing with this monkeypatching in place.
Also remove all the xfail markings in the workbench test suite, since all
tests now pass.
Since workbench monkeypatching isn't currently affecting any unit tests,
tox.ini was simplified to remove the special-casing for running the
workbench tests.
Removed the v20 workbench test suite, since the workbench currently only
works with the latest stix object version.
2019-07-19 20:50:11 +02:00
|
|
|
should raise an exception.
|
2018-07-10 20:50:03 +02:00
|
|
|
- ``def default(self):``
|
|
|
|
- provide a default value for this property.
|
|
|
|
- ``default()`` can return the special value ``NOW`` to use the current
|
2018-07-12 20:31:14 +02:00
|
|
|
time. This is useful when several timestamps in the same object
|
|
|
|
need to use the same default value, so calling now() for each
|
|
|
|
property-- likely several microseconds apart-- does not work.
|
|
|
|
|
|
|
|
Subclasses can instead provide a lambda function for ``default`` as a
|
|
|
|
keyword argument. ``clean`` should not be provided as a lambda since
|
|
|
|
lambdas cannot raise their own exceptions.
|
|
|
|
|
|
|
|
When instantiating Properties, ``required`` and ``default`` should not be
|
|
|
|
used together. ``default`` implies that the property is required in the
|
|
|
|
specification so this function will be used to supply a value if none is
|
|
|
|
provided. ``required`` means that the user must provide this; it is
|
|
|
|
required in the specification and we can't or don't want to create a
|
|
|
|
default value.
|
|
|
|
|
2018-07-10 20:50:03 +02:00
|
|
|
"""
|
|
|
|
|
|
|
|
def _default_clean(self, value):
|
|
|
|
if value != self._fixed_value:
|
2018-07-12 20:31:14 +02:00
|
|
|
raise ValueError("must equal '{}'.".format(self._fixed_value))
|
2018-07-10 20:50:03 +02:00
|
|
|
return value
|
|
|
|
|
2018-07-05 18:39:44 +02:00
|
|
|
def __init__(self, required=False, fixed=None, default=None):
|
2018-07-10 20:50:03 +02:00
|
|
|
self.required = required
|
|
|
|
if fixed:
|
|
|
|
self._fixed_value = fixed
|
|
|
|
self.clean = self._default_clean
|
|
|
|
self.default = lambda: fixed
|
|
|
|
if default:
|
|
|
|
self.default = default
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
return value
|
|
|
|
|
|
|
|
def __call__(self, value=None):
|
|
|
|
"""Used by ListProperty to handle lists that have been defined with
|
|
|
|
either a class or an instance.
|
|
|
|
"""
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
|
|
class ListProperty(Property):
|
|
|
|
|
|
|
|
def __init__(self, contained, **kwargs):
|
|
|
|
"""
|
|
|
|
``contained`` should be a function which returns an object from the value.
|
|
|
|
"""
|
|
|
|
if inspect.isclass(contained) and issubclass(contained, Property):
|
|
|
|
# If it's a class and not an instance, instantiate it so that
|
|
|
|
# clean() can be called on it, and ListProperty.clean() will
|
|
|
|
# use __call__ when it appends the item.
|
|
|
|
self.contained = contained()
|
|
|
|
else:
|
|
|
|
self.contained = contained
|
|
|
|
super(ListProperty, self).__init__(**kwargs)
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
try:
|
|
|
|
iter(value)
|
|
|
|
except TypeError:
|
|
|
|
raise ValueError("must be an iterable.")
|
|
|
|
|
|
|
|
if isinstance(value, (_STIXBase, string_types)):
|
|
|
|
value = [value]
|
|
|
|
|
|
|
|
result = []
|
|
|
|
for item in value:
|
|
|
|
try:
|
|
|
|
valid = self.contained.clean(item)
|
|
|
|
except ValueError:
|
|
|
|
raise
|
|
|
|
except AttributeError:
|
|
|
|
# type of list has no clean() function (eg. built in Python types)
|
|
|
|
# TODO Should we raise an error here?
|
|
|
|
valid = item
|
|
|
|
|
|
|
|
if type(self.contained) is EmbeddedObjectProperty:
|
|
|
|
obj_type = self.contained.type
|
2019-06-15 00:10:38 +02:00
|
|
|
elif type(self.contained).__name__ == "STIXObjectProperty":
|
2018-07-10 20:50:03 +02:00
|
|
|
# ^ this way of checking doesn't require a circular import
|
|
|
|
# valid is already an instance of a python-stix2 class; no need
|
|
|
|
# to turn it into a dictionary and then pass it to the class
|
|
|
|
# constructor again
|
|
|
|
result.append(valid)
|
|
|
|
continue
|
|
|
|
elif type(self.contained) is DictionaryProperty:
|
|
|
|
obj_type = dict
|
|
|
|
else:
|
|
|
|
obj_type = self.contained
|
|
|
|
|
|
|
|
if isinstance(valid, collections.Mapping):
|
|
|
|
result.append(obj_type(**valid))
|
|
|
|
else:
|
|
|
|
result.append(obj_type(valid))
|
|
|
|
|
|
|
|
# STIX spec forbids empty lists
|
|
|
|
if len(result) < 1:
|
|
|
|
raise ValueError("must not be empty.")
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
2019-01-09 16:46:48 +01:00
|
|
|
class CallableValues(list):
|
|
|
|
"""Wrapper to allow `values()` method on WindowsRegistryKey objects.
|
|
|
|
Needed because `values` is also a property.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, parent_instance, *args, **kwargs):
|
|
|
|
self.parent_instance = parent_instance
|
|
|
|
super(CallableValues, self).__init__(*args, **kwargs)
|
|
|
|
|
|
|
|
def __call__(self):
|
|
|
|
return _Observable.values(self.parent_instance)
|
|
|
|
|
|
|
|
|
2018-07-10 20:50:03 +02:00
|
|
|
class StringProperty(Property):
|
|
|
|
|
|
|
|
def __init__(self, **kwargs):
|
|
|
|
super(StringProperty, self).__init__(**kwargs)
|
|
|
|
|
|
|
|
def clean(self, value):
|
2019-04-23 15:27:21 +02:00
|
|
|
if not isinstance(value, string_types):
|
|
|
|
return text_type(value)
|
|
|
|
return value
|
2018-07-10 20:50:03 +02:00
|
|
|
|
|
|
|
|
|
|
|
class TypeProperty(Property):
|
|
|
|
|
|
|
|
def __init__(self, type):
|
|
|
|
super(TypeProperty, self).__init__(fixed=type)
|
|
|
|
|
|
|
|
|
|
|
|
class IDProperty(Property):
|
|
|
|
|
2019-06-21 19:18:51 +02:00
|
|
|
def __init__(self, type, spec_version=stix2.DEFAULT_VERSION):
|
2018-07-10 20:50:03 +02:00
|
|
|
self.required_prefix = type + "--"
|
2019-06-13 02:19:47 +02:00
|
|
|
self.spec_version = spec_version
|
2018-07-10 20:50:03 +02:00
|
|
|
super(IDProperty, self).__init__()
|
|
|
|
|
|
|
|
def clean(self, value):
|
2019-06-14 00:37:21 +02:00
|
|
|
_validate_id(value, self.spec_version, self.required_prefix)
|
2018-07-10 20:50:03 +02:00
|
|
|
return value
|
|
|
|
|
|
|
|
def default(self):
|
|
|
|
return self.required_prefix + str(uuid.uuid4())
|
|
|
|
|
|
|
|
|
|
|
|
class IntegerProperty(Property):
|
|
|
|
|
2018-10-15 21:02:59 +02:00
|
|
|
def __init__(self, min=None, max=None, **kwargs):
|
|
|
|
self.min = min
|
|
|
|
self.max = max
|
|
|
|
super(IntegerProperty, self).__init__(**kwargs)
|
|
|
|
|
2018-07-10 20:50:03 +02:00
|
|
|
def clean(self, value):
|
|
|
|
try:
|
2018-10-15 21:02:59 +02:00
|
|
|
value = int(value)
|
2018-07-10 20:50:03 +02:00
|
|
|
except Exception:
|
|
|
|
raise ValueError("must be an integer.")
|
|
|
|
|
2018-10-15 21:02:59 +02:00
|
|
|
if self.min is not None and value < self.min:
|
|
|
|
msg = "minimum value is {}. received {}".format(self.min, value)
|
|
|
|
raise ValueError(msg)
|
|
|
|
|
|
|
|
if self.max is not None and value > self.max:
|
|
|
|
msg = "maximum value is {}. received {}".format(self.max, value)
|
|
|
|
raise ValueError(msg)
|
|
|
|
|
|
|
|
return value
|
|
|
|
|
2018-07-10 20:50:03 +02:00
|
|
|
|
|
|
|
class FloatProperty(Property):
|
2018-07-05 18:39:44 +02:00
|
|
|
|
2018-10-15 21:02:59 +02:00
|
|
|
def __init__(self, min=None, max=None, **kwargs):
|
|
|
|
self.min = min
|
|
|
|
self.max = max
|
|
|
|
super(FloatProperty, self).__init__(**kwargs)
|
|
|
|
|
2018-07-10 20:50:03 +02:00
|
|
|
def clean(self, value):
|
|
|
|
try:
|
2018-10-15 21:02:59 +02:00
|
|
|
value = float(value)
|
2018-07-10 20:50:03 +02:00
|
|
|
except Exception:
|
|
|
|
raise ValueError("must be a float.")
|
|
|
|
|
2018-10-15 21:02:59 +02:00
|
|
|
if self.min is not None and value < self.min:
|
|
|
|
msg = "minimum value is {}. received {}".format(self.min, value)
|
|
|
|
raise ValueError(msg)
|
|
|
|
|
|
|
|
if self.max is not None and value > self.max:
|
|
|
|
msg = "maximum value is {}. received {}".format(self.max, value)
|
|
|
|
raise ValueError(msg)
|
|
|
|
|
|
|
|
return value
|
|
|
|
|
2018-07-10 20:50:03 +02:00
|
|
|
|
|
|
|
class BooleanProperty(Property):
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
if isinstance(value, bool):
|
|
|
|
return value
|
|
|
|
|
2018-07-12 20:31:14 +02:00
|
|
|
trues = ['true', 't', '1']
|
|
|
|
falses = ['false', 'f', '0']
|
2018-07-10 20:50:03 +02:00
|
|
|
try:
|
|
|
|
if value.lower() in trues:
|
|
|
|
return True
|
|
|
|
if value.lower() in falses:
|
|
|
|
return False
|
|
|
|
except AttributeError:
|
|
|
|
if value == 1:
|
|
|
|
return True
|
|
|
|
if value == 0:
|
|
|
|
return False
|
|
|
|
|
|
|
|
raise ValueError("must be a boolean value.")
|
|
|
|
|
|
|
|
|
|
|
|
class TimestampProperty(Property):
|
|
|
|
|
|
|
|
def __init__(self, precision=None, **kwargs):
|
|
|
|
self.precision = precision
|
|
|
|
super(TimestampProperty, self).__init__(**kwargs)
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
return parse_into_datetime(value, self.precision)
|
|
|
|
|
|
|
|
|
|
|
|
class DictionaryProperty(Property):
|
|
|
|
|
2019-06-21 19:18:51 +02:00
|
|
|
def __init__(self, spec_version=stix2.DEFAULT_VERSION, **kwargs):
|
2018-07-10 20:50:03 +02:00
|
|
|
self.spec_version = spec_version
|
|
|
|
super(DictionaryProperty, self).__init__(**kwargs)
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
try:
|
|
|
|
dictified = _get_dict(value)
|
|
|
|
except ValueError:
|
|
|
|
raise ValueError("The dictionary property must contain a dictionary")
|
|
|
|
for k in dictified.keys():
|
|
|
|
if self.spec_version == '2.0':
|
|
|
|
if len(k) < 3:
|
|
|
|
raise DictionaryKeyError(k, "shorter than 3 characters")
|
|
|
|
elif len(k) > 256:
|
|
|
|
raise DictionaryKeyError(k, "longer than 256 characters")
|
|
|
|
elif self.spec_version == '2.1':
|
|
|
|
if len(k) > 250:
|
|
|
|
raise DictionaryKeyError(k, "longer than 250 characters")
|
2018-10-17 13:34:15 +02:00
|
|
|
if not re.match(r"^[a-zA-Z0-9_-]+$", k):
|
2018-07-13 17:10:05 +02:00
|
|
|
msg = (
|
|
|
|
"contains characters other than lowercase a-z, "
|
|
|
|
"uppercase A-Z, numerals 0-9, hyphen (-), or "
|
|
|
|
"underscore (_)"
|
|
|
|
)
|
2018-07-10 20:50:03 +02:00
|
|
|
raise DictionaryKeyError(k, msg)
|
|
|
|
return dictified
|
|
|
|
|
|
|
|
|
|
|
|
HASHES_REGEX = {
|
2018-10-17 13:34:15 +02:00
|
|
|
"MD5": (r"^[a-fA-F0-9]{32}$", "MD5"),
|
|
|
|
"MD6": (r"^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"),
|
|
|
|
"RIPEMD160": (r"^[a-fA-F0-9]{40}$", "RIPEMD-160"),
|
|
|
|
"SHA1": (r"^[a-fA-F0-9]{40}$", "SHA-1"),
|
|
|
|
"SHA224": (r"^[a-fA-F0-9]{56}$", "SHA-224"),
|
|
|
|
"SHA256": (r"^[a-fA-F0-9]{64}$", "SHA-256"),
|
|
|
|
"SHA384": (r"^[a-fA-F0-9]{96}$", "SHA-384"),
|
|
|
|
"SHA512": (r"^[a-fA-F0-9]{128}$", "SHA-512"),
|
|
|
|
"SHA3224": (r"^[a-fA-F0-9]{56}$", "SHA3-224"),
|
|
|
|
"SHA3256": (r"^[a-fA-F0-9]{64}$", "SHA3-256"),
|
|
|
|
"SHA3384": (r"^[a-fA-F0-9]{96}$", "SHA3-384"),
|
|
|
|
"SHA3512": (r"^[a-fA-F0-9]{128}$", "SHA3-512"),
|
|
|
|
"SSDEEP": (r"^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"),
|
|
|
|
"WHIRLPOOL": (r"^[a-fA-F0-9]{128}$", "WHIRLPOOL"),
|
2018-07-10 20:50:03 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class HashesProperty(DictionaryProperty):
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
clean_dict = super(HashesProperty, self).clean(value)
|
|
|
|
for k, v in clean_dict.items():
|
|
|
|
key = k.upper().replace('-', '')
|
|
|
|
if key in HASHES_REGEX:
|
|
|
|
vocab_key = HASHES_REGEX[key][1]
|
|
|
|
if not re.match(HASHES_REGEX[key][0], v):
|
2018-07-12 20:31:14 +02:00
|
|
|
raise ValueError("'{0}' is not a valid {1} hash".format(v, vocab_key))
|
2018-07-10 20:50:03 +02:00
|
|
|
if k != vocab_key:
|
|
|
|
clean_dict[vocab_key] = clean_dict[k]
|
|
|
|
del clean_dict[k]
|
|
|
|
return clean_dict
|
|
|
|
|
|
|
|
|
|
|
|
class BinaryProperty(Property):
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
try:
|
|
|
|
base64.b64decode(value)
|
|
|
|
except (binascii.Error, TypeError):
|
|
|
|
raise ValueError("must contain a base64 encoded string")
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
|
|
class HexProperty(Property):
|
|
|
|
|
|
|
|
def clean(self, value):
|
2018-10-17 13:34:15 +02:00
|
|
|
if not re.match(r"^([a-fA-F0-9]{2})+$", value):
|
2018-07-10 20:50:03 +02:00
|
|
|
raise ValueError("must contain an even number of hexadecimal characters")
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
|
|
class ReferenceProperty(Property):
|
|
|
|
|
2019-08-29 23:15:51 +02:00
|
|
|
def __init__(self, valid_types=None, invalid_types=None, spec_version=stix2.DEFAULT_VERSION, **kwargs):
|
2018-07-10 20:50:03 +02:00
|
|
|
"""
|
|
|
|
references sometimes must be to a specific object type
|
|
|
|
"""
|
2019-06-13 02:19:47 +02:00
|
|
|
self.spec_version = spec_version
|
2019-08-27 23:36:45 +02:00
|
|
|
|
2019-08-29 23:15:51 +02:00
|
|
|
# These checks need to be done prior to the STIX object finishing construction
|
|
|
|
# and thus we can't use base.py's _check_mutually_exclusive_properties()
|
|
|
|
# in the typical location of _check_object_constraints() in sdo.py
|
|
|
|
if valid_types and invalid_types:
|
|
|
|
raise MutuallyExclusivePropertiesError(self.__class__, ['invalid_types', 'valid_types'])
|
|
|
|
elif valid_types is None and invalid_types is None:
|
|
|
|
raise MissingPropertiesError(self.__class__, ['invalid_types', 'valid_types'])
|
|
|
|
|
2019-08-27 23:36:45 +02:00
|
|
|
if valid_types and type(valid_types) is not list:
|
|
|
|
valid_types = [valid_types]
|
2019-08-29 23:15:51 +02:00
|
|
|
elif invalid_types and type(invalid_types) is not list:
|
|
|
|
invalid_types = [invalid_types]
|
|
|
|
|
2019-08-27 23:36:45 +02:00
|
|
|
self.valid_types = valid_types
|
2019-08-29 23:15:51 +02:00
|
|
|
self.invalid_types = invalid_types
|
2019-08-27 23:36:45 +02:00
|
|
|
|
2018-07-05 18:39:44 +02:00
|
|
|
super(ReferenceProperty, self).__init__(**kwargs)
|
2018-07-10 20:50:03 +02:00
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
if isinstance(value, _STIXBase):
|
|
|
|
value = value.id
|
|
|
|
value = str(value)
|
2019-06-13 02:19:47 +02:00
|
|
|
|
2019-08-29 23:15:51 +02:00
|
|
|
possible_prefix = value[:value.index('--') + 2]
|
|
|
|
|
|
|
|
if self.valid_types:
|
2019-08-30 09:47:47 +02:00
|
|
|
if self.valid_types == ["only_SDO"]:
|
|
|
|
self.valid_types = STIX2_OBJ_MAPS['v21']['objects'].keys()
|
|
|
|
elif self.valid_types == ["only_SCO"]:
|
|
|
|
self.valid_types = STIX2_OBJ_MAPS['v21']['observables'].keys()
|
|
|
|
elif self.valid_types == ["only_SCO_&_SRO"]:
|
2019-09-07 00:08:27 +02:00
|
|
|
self.valid_types = list(STIX2_OBJ_MAPS['v21']['observables'].keys()) + ['relationship', 'sighting']
|
2019-08-30 09:47:47 +02:00
|
|
|
|
|
|
|
if possible_prefix[:-2] in self.valid_types:
|
2019-08-29 23:15:51 +02:00
|
|
|
required_prefix = possible_prefix
|
|
|
|
else:
|
2019-09-06 06:25:42 +02:00
|
|
|
raise ValueError("The type-specifying prefix '%s' for this property is not valid" % (possible_prefix))
|
2019-08-29 23:15:51 +02:00
|
|
|
elif self.invalid_types:
|
2019-08-30 09:47:47 +02:00
|
|
|
if possible_prefix[:-2] not in self.invalid_types:
|
2019-08-29 23:15:51 +02:00
|
|
|
required_prefix = possible_prefix
|
|
|
|
else:
|
2019-09-06 06:25:42 +02:00
|
|
|
raise ValueError("An invalid type-specifying prefix '%s' was specified for this property" % (possible_prefix, value))
|
2019-08-27 23:36:45 +02:00
|
|
|
|
|
|
|
_validate_id(value, self.spec_version, required_prefix)
|
2019-06-13 02:19:47 +02:00
|
|
|
|
2018-07-10 20:50:03 +02:00
|
|
|
return value
|
|
|
|
|
|
|
|
|
2018-10-17 13:34:15 +02:00
|
|
|
SELECTOR_REGEX = re.compile(r"^[a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250}))*$")
|
2018-07-10 20:50:03 +02:00
|
|
|
|
|
|
|
|
|
|
|
class SelectorProperty(Property):
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
if not SELECTOR_REGEX.match(value):
|
|
|
|
raise ValueError("must adhere to selector syntax.")
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
|
|
class ObjectReferenceProperty(StringProperty):
|
|
|
|
|
|
|
|
def __init__(self, valid_types=None, **kwargs):
|
|
|
|
if valid_types and type(valid_types) is not list:
|
|
|
|
valid_types = [valid_types]
|
|
|
|
self.valid_types = valid_types
|
|
|
|
super(ObjectReferenceProperty, self).__init__(**kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
class EmbeddedObjectProperty(Property):
|
|
|
|
|
2018-07-05 18:39:44 +02:00
|
|
|
def __init__(self, type, **kwargs):
|
2018-07-10 20:50:03 +02:00
|
|
|
self.type = type
|
2018-07-05 18:39:44 +02:00
|
|
|
super(EmbeddedObjectProperty, self).__init__(**kwargs)
|
2018-07-10 20:50:03 +02:00
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
if type(value) is dict:
|
|
|
|
value = self.type(**value)
|
|
|
|
elif not isinstance(value, self.type):
|
2018-07-12 20:31:14 +02:00
|
|
|
raise ValueError("must be of type {}.".format(self.type.__name__))
|
2018-07-10 20:50:03 +02:00
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
|
|
class EnumProperty(StringProperty):
|
|
|
|
|
|
|
|
def __init__(self, allowed, **kwargs):
|
|
|
|
if type(allowed) is not list:
|
|
|
|
allowed = list(allowed)
|
|
|
|
self.allowed = allowed
|
|
|
|
super(EnumProperty, self).__init__(**kwargs)
|
|
|
|
|
|
|
|
def clean(self, value):
|
2019-04-23 15:27:21 +02:00
|
|
|
cleaned_value = super(EnumProperty, self).clean(value)
|
|
|
|
if cleaned_value not in self.allowed:
|
|
|
|
raise ValueError("value '{}' is not valid for this enumeration.".format(cleaned_value))
|
|
|
|
|
|
|
|
return cleaned_value
|
2018-07-10 20:50:03 +02:00
|
|
|
|
|
|
|
|
|
|
|
class PatternProperty(StringProperty):
|
|
|
|
|
|
|
|
def clean(self, value):
|
2019-04-23 15:27:21 +02:00
|
|
|
cleaned_value = super(PatternProperty, self).clean(value)
|
|
|
|
errors = run_validator(cleaned_value)
|
2018-07-10 20:50:03 +02:00
|
|
|
if errors:
|
|
|
|
raise ValueError(str(errors[0]))
|
|
|
|
|
2019-04-23 15:27:21 +02:00
|
|
|
return cleaned_value
|
2018-07-10 20:50:03 +02:00
|
|
|
|
|
|
|
|
|
|
|
class ObservableProperty(Property):
|
|
|
|
"""Property for holding Cyber Observable Objects.
|
|
|
|
"""
|
|
|
|
|
2019-06-21 19:18:51 +02:00
|
|
|
def __init__(self, spec_version=stix2.DEFAULT_VERSION, allow_custom=False, *args, **kwargs):
|
2018-07-10 20:50:03 +02:00
|
|
|
self.allow_custom = allow_custom
|
|
|
|
self.spec_version = spec_version
|
|
|
|
super(ObservableProperty, self).__init__(*args, **kwargs)
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
try:
|
|
|
|
dictified = _get_dict(value)
|
|
|
|
# get deep copy since we are going modify the dict and might
|
|
|
|
# modify the original dict as _get_dict() does not return new
|
|
|
|
# dict when passed a dict
|
|
|
|
dictified = copy.deepcopy(dictified)
|
|
|
|
except ValueError:
|
|
|
|
raise ValueError("The observable property must contain a dictionary")
|
|
|
|
if dictified == {}:
|
|
|
|
raise ValueError("The observable property must contain a non-empty dictionary")
|
|
|
|
|
|
|
|
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
|
|
|
|
|
|
|
|
for key, obj in dictified.items():
|
2018-07-13 17:10:05 +02:00
|
|
|
parsed_obj = parse_observable(
|
|
|
|
obj,
|
|
|
|
valid_refs,
|
|
|
|
allow_custom=self.allow_custom,
|
|
|
|
version=self.spec_version,
|
|
|
|
)
|
2018-07-10 20:50:03 +02:00
|
|
|
dictified[key] = parsed_obj
|
|
|
|
|
|
|
|
return dictified
|
|
|
|
|
|
|
|
|
|
|
|
class ExtensionsProperty(DictionaryProperty):
|
|
|
|
"""Property for representing extensions on Observable objects.
|
|
|
|
"""
|
|
|
|
|
2019-06-21 19:18:51 +02:00
|
|
|
def __init__(self, spec_version=stix2.DEFAULT_VERSION, allow_custom=False, enclosing_type=None, required=False):
|
2018-07-10 20:50:03 +02:00
|
|
|
self.allow_custom = allow_custom
|
|
|
|
self.enclosing_type = enclosing_type
|
|
|
|
super(ExtensionsProperty, self).__init__(spec_version=spec_version, required=required)
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
try:
|
|
|
|
dictified = _get_dict(value)
|
|
|
|
# get deep copy since we are going modify the dict and might
|
|
|
|
# modify the original dict as _get_dict() does not return new
|
|
|
|
# dict when passed a dict
|
|
|
|
dictified = copy.deepcopy(dictified)
|
|
|
|
except ValueError:
|
|
|
|
raise ValueError("The extensions property must contain a dictionary")
|
|
|
|
|
|
|
|
v = 'v' + self.spec_version.replace('.', '')
|
|
|
|
|
2018-11-28 17:21:27 +01:00
|
|
|
specific_type_map = STIX2_OBJ_MAPS[v]['observable-extensions'].get(self.enclosing_type, {})
|
|
|
|
for key, subvalue in dictified.items():
|
|
|
|
if key in specific_type_map:
|
|
|
|
cls = specific_type_map[key]
|
|
|
|
if type(subvalue) is dict:
|
|
|
|
if self.allow_custom:
|
|
|
|
subvalue['allow_custom'] = True
|
|
|
|
dictified[key] = cls(**subvalue)
|
2018-07-10 20:50:03 +02:00
|
|
|
else:
|
2018-11-28 17:21:27 +01:00
|
|
|
dictified[key] = cls(**subvalue)
|
|
|
|
elif type(subvalue) is cls:
|
|
|
|
# If already an instance of an _Extension class, assume it's valid
|
|
|
|
dictified[key] = subvalue
|
2018-07-10 20:50:03 +02:00
|
|
|
else:
|
2018-11-28 17:21:27 +01:00
|
|
|
raise ValueError("Cannot determine extension type.")
|
|
|
|
else:
|
2019-08-26 23:10:54 +02:00
|
|
|
if self.allow_custom:
|
|
|
|
dictified[key] = subvalue
|
|
|
|
else:
|
|
|
|
raise CustomContentError("Can't parse unknown extension type: {}".format(key))
|
2018-07-10 20:50:03 +02:00
|
|
|
return dictified
|
|
|
|
|
|
|
|
|
|
|
|
class STIXObjectProperty(Property):
|
|
|
|
|
2019-06-21 19:18:51 +02:00
|
|
|
def __init__(self, spec_version=stix2.DEFAULT_VERSION, allow_custom=False, *args, **kwargs):
|
2018-07-10 20:50:03 +02:00
|
|
|
self.allow_custom = allow_custom
|
|
|
|
self.spec_version = spec_version
|
|
|
|
super(STIXObjectProperty, self).__init__(*args, **kwargs)
|
|
|
|
|
|
|
|
def clean(self, value):
|
|
|
|
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
|
|
|
|
# a bundle with no further checks.
|
|
|
|
if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition')
|
|
|
|
for x in get_class_hierarchy_names(value)):
|
|
|
|
# A simple "is this a spec version 2.1+ object" test. For now,
|
|
|
|
# limit 2.0 bundles to 2.0 objects. It's not possible yet to
|
|
|
|
# have validation co-constraints among properties, e.g. have
|
|
|
|
# validation here depend on the value of another property
|
|
|
|
# (spec_version). So this is a hack, and not technically spec-
|
|
|
|
# compliant.
|
|
|
|
if 'spec_version' in value and self.spec_version == '2.0':
|
2018-07-13 17:10:05 +02:00
|
|
|
raise ValueError(
|
|
|
|
"Spec version 2.0 bundles don't yet support "
|
|
|
|
"containing objects of a different spec "
|
|
|
|
"version.",
|
|
|
|
)
|
2018-07-10 20:50:03 +02:00
|
|
|
return value
|
|
|
|
try:
|
|
|
|
dictified = _get_dict(value)
|
|
|
|
except ValueError:
|
|
|
|
raise ValueError("This property may only contain a dictionary or object")
|
|
|
|
if dictified == {}:
|
|
|
|
raise ValueError("This property may only contain a non-empty dictionary or object")
|
|
|
|
if 'type' in dictified and dictified['type'] == 'bundle':
|
|
|
|
raise ValueError("This property may not contain a Bundle object")
|
|
|
|
if 'spec_version' in dictified and self.spec_version == '2.0':
|
|
|
|
# See above comment regarding spec_version.
|
2018-07-13 17:10:05 +02:00
|
|
|
raise ValueError(
|
|
|
|
"Spec version 2.0 bundles don't yet support "
|
|
|
|
"containing objects of a different spec version.",
|
|
|
|
)
|
2018-07-10 20:50:03 +02:00
|
|
|
|
|
|
|
parsed_obj = parse(dictified, allow_custom=self.allow_custom)
|
|
|
|
|
|
|
|
return parsed_obj
|