cti-python-stix2/stix2/properties.py

565 lines
20 KiB
Python

"""Classes for representing properties of STIX Objects and Cyber Observables."""
import base64
import binascii
import collections
import copy
import inspect
import re
import uuid
from six import string_types, text_type
from stix2patterns.validator import run_validator
from .base import _STIXBase
from .core import STIX2_OBJ_MAPS, parse, parse_observable
from .exceptions import CustomContentError, DictionaryKeyError
from .utils import _get_dict, get_class_hierarchy_names, parse_into_datetime
# This uses the regular expression for a RFC 4122, Version 4 UUID. In the
# 8-4-4-4-12 hexadecimal representation, the first hex digit of the third
# component must be a 4, and the first hex digit of the fourth component
# must be 8, 9, a, or b (10xx bit pattern).
ID_REGEX = re.compile(
r"^[a-z0-9][a-z0-9-]+[a-z0-9]--" # object type
"[0-9a-fA-F]{8}-"
"[0-9a-fA-F]{4}-"
"4[0-9a-fA-F]{3}-"
"[89abAB][0-9a-fA-F]{3}-"
"[0-9a-fA-F]{12}$",
)
ERROR_INVALID_ID = (
"not a valid STIX identifier, must match <object-type>--<UUIDv4>"
)
class Property(object):
"""Represent a property of STIX data type.
Subclasses can define the following attributes as keyword arguments to
``__init__()``.
Args:
required (bool): If ``True``, the property must be provided when
creating an object with that property. No default value exists for
these properties. (Default: ``False``)
fixed: This provides a constant default value. Users are free to
provide this value explicity when constructing an object (which
allows you to copy **all** values from an existing object to a new
object), but if the user provides a value other than the ``fixed``
value, it will raise an error. This is semantically equivalent to
defining both:
- a ``clean()`` function that checks if the value matches the fixed
value, and
- a ``default()`` function that returns the fixed value.
Subclasses can also define the following functions:
- ``def clean(self, value) -> any:``
- Return a value that is valid for this property. If ``value`` is not
valid for this property, this will attempt to transform it first. If
``value`` is not valid and no such transformation is possible, it
should raise a ValueError.
- ``def default(self):``
- provide a default value for this property.
- ``default()`` can return the special value ``NOW`` to use the current
time. This is useful when several timestamps in the same object
need to use the same default value, so calling now() for each
property-- likely several microseconds apart-- does not work.
Subclasses can instead provide a lambda function for ``default`` as a
keyword argument. ``clean`` should not be provided as a lambda since
lambdas cannot raise their own exceptions.
When instantiating Properties, ``required`` and ``default`` should not be
used together. ``default`` implies that the property is required in the
specification so this function will be used to supply a value if none is
provided. ``required`` means that the user must provide this; it is
required in the specification and we can't or don't want to create a
default value.
"""
def _default_clean(self, value):
if value != self._fixed_value:
raise ValueError("must equal '{}'.".format(self._fixed_value))
return value
def __init__(self, required=False, fixed=None, default=None):
self.required = required
if fixed:
self._fixed_value = fixed
self.clean = self._default_clean
self.default = lambda: fixed
if default:
self.default = default
def clean(self, value):
return value
def __call__(self, value=None):
"""Used by ListProperty to handle lists that have been defined with
either a class or an instance.
"""
return value
class ListProperty(Property):
def __init__(self, contained, **kwargs):
"""
``contained`` should be a function which returns an object from the value.
"""
if inspect.isclass(contained) and issubclass(contained, Property):
# If it's a class and not an instance, instantiate it so that
# clean() can be called on it, and ListProperty.clean() will
# use __call__ when it appends the item.
self.contained = contained()
else:
self.contained = contained
super(ListProperty, self).__init__(**kwargs)
def clean(self, value):
try:
iter(value)
except TypeError:
raise ValueError("must be an iterable.")
if isinstance(value, (_STIXBase, string_types)):
value = [value]
result = []
for item in value:
try:
valid = self.contained.clean(item)
except ValueError:
raise
except AttributeError:
# type of list has no clean() function (eg. built in Python types)
# TODO Should we raise an error here?
valid = item
if type(self.contained) is EmbeddedObjectProperty:
obj_type = self.contained.type
elif type(self.contained).__name__ is "STIXObjectProperty":
# ^ this way of checking doesn't require a circular import
# valid is already an instance of a python-stix2 class; no need
# to turn it into a dictionary and then pass it to the class
# constructor again
result.append(valid)
continue
elif type(self.contained) is DictionaryProperty:
obj_type = dict
else:
obj_type = self.contained
if isinstance(valid, collections.Mapping):
result.append(obj_type(**valid))
else:
result.append(obj_type(valid))
# STIX spec forbids empty lists
if len(result) < 1:
raise ValueError("must not be empty.")
return result
class StringProperty(Property):
def __init__(self, **kwargs):
self.string_type = text_type
super(StringProperty, self).__init__(**kwargs)
def clean(self, value):
return self.string_type(value)
class TypeProperty(Property):
def __init__(self, type):
super(TypeProperty, self).__init__(fixed=type)
class IDProperty(Property):
def __init__(self, type):
self.required_prefix = type + "--"
super(IDProperty, self).__init__()
def clean(self, value):
if not value.startswith(self.required_prefix):
raise ValueError("must start with '{}'.".format(self.required_prefix))
if not ID_REGEX.match(value):
raise ValueError(ERROR_INVALID_ID)
return value
def default(self):
return self.required_prefix + str(uuid.uuid4())
class IntegerProperty(Property):
def __init__(self, min=None, max=None, **kwargs):
self.min = min
self.max = max
super(IntegerProperty, self).__init__(**kwargs)
def clean(self, value):
try:
value = int(value)
except Exception:
raise ValueError("must be an integer.")
if self.min is not None and value < self.min:
msg = "minimum value is {}. received {}".format(self.min, value)
raise ValueError(msg)
if self.max is not None and value > self.max:
msg = "maximum value is {}. received {}".format(self.max, value)
raise ValueError(msg)
return value
class FloatProperty(Property):
def __init__(self, min=None, max=None, **kwargs):
self.min = min
self.max = max
super(FloatProperty, self).__init__(**kwargs)
def clean(self, value):
try:
value = float(value)
except Exception:
raise ValueError("must be a float.")
if self.min is not None and value < self.min:
msg = "minimum value is {}. received {}".format(self.min, value)
raise ValueError(msg)
if self.max is not None and value > self.max:
msg = "maximum value is {}. received {}".format(self.max, value)
raise ValueError(msg)
return value
class BooleanProperty(Property):
def clean(self, value):
if isinstance(value, bool):
return value
trues = ['true', 't', '1']
falses = ['false', 'f', '0']
try:
if value.lower() in trues:
return True
if value.lower() in falses:
return False
except AttributeError:
if value == 1:
return True
if value == 0:
return False
raise ValueError("must be a boolean value.")
class TimestampProperty(Property):
def __init__(self, precision=None, **kwargs):
self.precision = precision
super(TimestampProperty, self).__init__(**kwargs)
def clean(self, value):
return parse_into_datetime(value, self.precision)
class DictionaryProperty(Property):
def __init__(self, spec_version='2.0', **kwargs):
self.spec_version = spec_version
super(DictionaryProperty, self).__init__(**kwargs)
def clean(self, value):
try:
dictified = _get_dict(value)
except ValueError:
raise ValueError("The dictionary property must contain a dictionary")
for k in dictified.keys():
if self.spec_version == '2.0':
if len(k) < 3:
raise DictionaryKeyError(k, "shorter than 3 characters")
elif len(k) > 256:
raise DictionaryKeyError(k, "longer than 256 characters")
elif self.spec_version == '2.1':
if len(k) > 250:
raise DictionaryKeyError(k, "longer than 250 characters")
if not re.match(r"^[a-zA-Z0-9_-]+$", k):
msg = (
"contains characters other than lowercase a-z, "
"uppercase A-Z, numerals 0-9, hyphen (-), or "
"underscore (_)"
)
raise DictionaryKeyError(k, msg)
return dictified
HASHES_REGEX = {
"MD5": (r"^[a-fA-F0-9]{32}$", "MD5"),
"MD6": (r"^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"),
"RIPEMD160": (r"^[a-fA-F0-9]{40}$", "RIPEMD-160"),
"SHA1": (r"^[a-fA-F0-9]{40}$", "SHA-1"),
"SHA224": (r"^[a-fA-F0-9]{56}$", "SHA-224"),
"SHA256": (r"^[a-fA-F0-9]{64}$", "SHA-256"),
"SHA384": (r"^[a-fA-F0-9]{96}$", "SHA-384"),
"SHA512": (r"^[a-fA-F0-9]{128}$", "SHA-512"),
"SHA3224": (r"^[a-fA-F0-9]{56}$", "SHA3-224"),
"SHA3256": (r"^[a-fA-F0-9]{64}$", "SHA3-256"),
"SHA3384": (r"^[a-fA-F0-9]{96}$", "SHA3-384"),
"SHA3512": (r"^[a-fA-F0-9]{128}$", "SHA3-512"),
"SSDEEP": (r"^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"),
"WHIRLPOOL": (r"^[a-fA-F0-9]{128}$", "WHIRLPOOL"),
}
class HashesProperty(DictionaryProperty):
def clean(self, value):
clean_dict = super(HashesProperty, self).clean(value)
for k, v in clean_dict.items():
key = k.upper().replace('-', '')
if key in HASHES_REGEX:
vocab_key = HASHES_REGEX[key][1]
if not re.match(HASHES_REGEX[key][0], v):
raise ValueError("'{0}' is not a valid {1} hash".format(v, vocab_key))
if k != vocab_key:
clean_dict[vocab_key] = clean_dict[k]
del clean_dict[k]
return clean_dict
class BinaryProperty(Property):
def clean(self, value):
try:
base64.b64decode(value)
except (binascii.Error, TypeError):
raise ValueError("must contain a base64 encoded string")
return value
class HexProperty(Property):
def clean(self, value):
if not re.match(r"^([a-fA-F0-9]{2})+$", value):
raise ValueError("must contain an even number of hexadecimal characters")
return value
class ReferenceProperty(Property):
def __init__(self, type=None, **kwargs):
"""
references sometimes must be to a specific object type
"""
self.type = type
super(ReferenceProperty, self).__init__(**kwargs)
def clean(self, value):
if isinstance(value, _STIXBase):
value = value.id
value = str(value)
if self.type:
if not value.startswith(self.type):
raise ValueError("must start with '{}'.".format(self.type))
if not ID_REGEX.match(value):
raise ValueError(ERROR_INVALID_ID)
return value
SELECTOR_REGEX = re.compile(r"^[a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250}))*$")
class SelectorProperty(Property):
def clean(self, value):
if not SELECTOR_REGEX.match(value):
raise ValueError("must adhere to selector syntax.")
return value
class ObjectReferenceProperty(StringProperty):
def __init__(self, valid_types=None, **kwargs):
if valid_types and type(valid_types) is not list:
valid_types = [valid_types]
self.valid_types = valid_types
super(ObjectReferenceProperty, self).__init__(**kwargs)
class EmbeddedObjectProperty(Property):
def __init__(self, type, **kwargs):
self.type = type
super(EmbeddedObjectProperty, self).__init__(**kwargs)
def clean(self, value):
if type(value) is dict:
value = self.type(**value)
elif not isinstance(value, self.type):
raise ValueError("must be of type {}.".format(self.type.__name__))
return value
class EnumProperty(StringProperty):
def __init__(self, allowed, **kwargs):
if type(allowed) is not list:
allowed = list(allowed)
self.allowed = allowed
super(EnumProperty, self).__init__(**kwargs)
def clean(self, value):
value = super(EnumProperty, self).clean(value)
if value not in self.allowed:
raise ValueError("value '{}' is not valid for this enumeration.".format(value))
return self.string_type(value)
class PatternProperty(StringProperty):
def clean(self, value):
str_value = super(PatternProperty, self).clean(value)
errors = run_validator(str_value)
if errors:
raise ValueError(str(errors[0]))
return self.string_type(value)
class ObservableProperty(Property):
"""Property for holding Cyber Observable Objects.
"""
def __init__(self, spec_version='2.0', allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
self.spec_version = spec_version
super(ObservableProperty, self).__init__(*args, **kwargs)
def clean(self, value):
try:
dictified = _get_dict(value)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
dictified = copy.deepcopy(dictified)
except ValueError:
raise ValueError("The observable property must contain a dictionary")
if dictified == {}:
raise ValueError("The observable property must contain a non-empty dictionary")
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
for key, obj in dictified.items():
parsed_obj = parse_observable(
obj,
valid_refs,
allow_custom=self.allow_custom,
version=self.spec_version,
)
dictified[key] = parsed_obj
return dictified
class ExtensionsProperty(DictionaryProperty):
"""Property for representing extensions on Observable objects.
"""
def __init__(self, spec_version='2.0', allow_custom=False, enclosing_type=None, required=False):
self.allow_custom = allow_custom
self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(spec_version=spec_version, required=required)
def clean(self, value):
try:
dictified = _get_dict(value)
# get deep copy since we are going modify the dict and might
# modify the original dict as _get_dict() does not return new
# dict when passed a dict
dictified = copy.deepcopy(dictified)
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
v = 'v' + self.spec_version.replace('.', '')
specific_type_map = STIX2_OBJ_MAPS[v]['observable-extensions'].get(self.enclosing_type, {})
for key, subvalue in dictified.items():
if key in specific_type_map:
cls = specific_type_map[key]
if type(subvalue) is dict:
if self.allow_custom:
subvalue['allow_custom'] = True
dictified[key] = cls(**subvalue)
else:
dictified[key] = cls(**subvalue)
elif type(subvalue) is cls:
# If already an instance of an _Extension class, assume it's valid
dictified[key] = subvalue
else:
raise ValueError("Cannot determine extension type.")
else:
raise CustomContentError("Can't parse unknown extension type: {}".format(key))
return dictified
class STIXObjectProperty(Property):
def __init__(self, spec_version='2.0', allow_custom=False, *args, **kwargs):
self.allow_custom = allow_custom
self.spec_version = spec_version
super(STIXObjectProperty, self).__init__(*args, **kwargs)
def clean(self, value):
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
# a bundle with no further checks.
if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition')
for x in get_class_hierarchy_names(value)):
# A simple "is this a spec version 2.1+ object" test. For now,
# limit 2.0 bundles to 2.0 objects. It's not possible yet to
# have validation co-constraints among properties, e.g. have
# validation here depend on the value of another property
# (spec_version). So this is a hack, and not technically spec-
# compliant.
if 'spec_version' in value and self.spec_version == '2.0':
raise ValueError(
"Spec version 2.0 bundles don't yet support "
"containing objects of a different spec "
"version.",
)
return value
try:
dictified = _get_dict(value)
except ValueError:
raise ValueError("This property may only contain a dictionary or object")
if dictified == {}:
raise ValueError("This property may only contain a non-empty dictionary or object")
if 'type' in dictified and dictified['type'] == 'bundle':
raise ValueError("This property may not contain a Bundle object")
if 'spec_version' in dictified and self.spec_version == '2.0':
# See above comment regarding spec_version.
raise ValueError(
"Spec version 2.0 bundles don't yet support "
"containing objects of a different spec version.",
)
parsed_obj = parse(dictified, allow_custom=self.allow_custom)
return parsed_obj