parse() handles observables in 2.1. Change mechanism for (in)valid_types in ReferenceProperty. Fix _custom_observable_builder to include ReferenceProperty instead of ObjectReferenceProperty, and added ID property to custom observables

master
Desai, Kartikey H 2019-11-06 10:11:12 -05:00 committed by Chris Lenk
parent aee296ea46
commit 3a46d42aaa
13 changed files with 123 additions and 82 deletions

View File

@ -8,7 +8,7 @@ import re
import stix2
from .base import _STIXBase
from .exceptions import CustomContentError, ParseError
from .exceptions import ParseError
from .markings import _MarkingsMixin
from .utils import _get_dict
@ -109,7 +109,7 @@ def dict_to_stix2(stix_dict, allow_custom=False, version=None):
# '2.0' representation.
v = 'v20'
OBJ_MAP = STIX2_OBJ_MAPS[v]['objects']
OBJ_MAP = dict(STIX2_OBJ_MAPS[v]['objects'], **STIX2_OBJ_MAPS[v]['observables'])
try:
obj_class = OBJ_MAP[stix_dict['type']]
@ -166,8 +166,7 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
# flag allows for unknown custom objects too, but will not
# be parsed into STIX observable object, just returned as is
return obj
raise CustomContentError("Can't parse unknown observable type '%s'! For custom observables, "
"use the CustomObservable decorator." % obj['type'])
raise ParseError("Can't parse unknown observable type '%s'! For custom observables, " "use the CustomObservable decorator." % obj['type'])
return obj_class(allow_custom=allow_custom, **obj)

View File

@ -67,19 +67,34 @@ def _custom_observable_builder(cls, type, properties, version):
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
# Check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties:
if prop_name.endswith('_ref') and ('ObjectReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name,
)
elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop)
or 'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained))):
raise ValueError(
"'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name,
)
if version == "2.0":
# If using STIX2.0, check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties:
if prop_name.endswith('_ref') and ('ObjectReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name,
)
elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop) or
'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained))):
raise ValueError(
"'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name,
)
else:
# If using STIX2.1 (or newer...), check properties ending in "_ref/s" are ReferenceProperties
for prop_name, prop in properties:
if prop_name.endswith('_ref') and ('ReferenceProperty' not in get_class_hierarchy_names(prop)):
raise ValueError(
"'%s' is named like a reference property but "
"is not a ReferenceProperty." % prop_name,
)
elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop) or
'ReferenceProperty' not in get_class_hierarchy_names(prop.contained))):
raise ValueError(
"'%s' is named like a reference list property but "
"is not a ListProperty containing ReferenceProperty." % prop_name,
)
_type = type
_properties = OrderedDict(properties)

View File

@ -454,22 +454,19 @@ class ReferenceProperty(Property):
value = value.id
value = str(value)
possible_prefix = value[:value.index('--') + 2]
possible_prefix = value[:value.index('--')]
if self.valid_types:
if self.valid_types == ["only_SDO"]:
self.valid_types = STIX2_OBJ_MAPS['v21']['objects'].keys()
elif self.valid_types == ["only_SCO"]:
self.valid_types = STIX2_OBJ_MAPS['v21']['observables'].keys()
elif self.valid_types == ["only_SCO_&_SRO"]:
self.valid_types = list(STIX2_OBJ_MAPS['v21']['observables'].keys()) + ['relationship', 'sighting']
ref_valid_types = enumerate_types(self.valid_types, 'v' + self.spec_version.replace(".", ""))
if possible_prefix[:-2] in self.valid_types:
if possible_prefix in ref_valid_types:
required_prefix = possible_prefix
else:
raise ValueError("The type-specifying prefix '%s' for this property is not valid" % (possible_prefix))
elif self.invalid_types:
if possible_prefix[:-2] not in self.invalid_types:
ref_invalid_types = enumerate_types(self.invalid_types, 'v' + self.spec_version.replace(".", ""))
if possible_prefix not in ref_invalid_types:
required_prefix = possible_prefix
else:
raise ValueError("An invalid type-specifying prefix '%s' was specified for this property" % (possible_prefix, value))
@ -479,6 +476,23 @@ class ReferenceProperty(Property):
return value
def enumerate_types(types, spec_version):
return_types = []
return_types += types
if "SDO" in types:
return_types.remove("SDO")
return_types += STIX2_OBJ_MAPS[spec_version]['objects'].keys()
if "SCO" in types:
return_types.remove("SCO")
return_types += STIX2_OBJ_MAPS[spec_version]['observables'].keys()
if "SRO" in types:
return_types.remove("SRO")
return_types += ['relationship', 'sighting']
return return_types
SELECTOR_REGEX = re.compile(r"^[a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250}))*$")

View File

@ -583,7 +583,7 @@ def test_parse_unregistered_custom_observable_object():
"property1": "something"
}"""
with pytest.raises(stix2.exceptions.CustomContentError) as excinfo:
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse_observable(nt_string, version='2.0')
assert "Can't parse unknown observable type" in str(excinfo.value)

View File

@ -29,7 +29,8 @@ def test_encode_json_object():
def test_deterministic_id_unicode():
mutex = {'name': u'D*Fl#Ed*\u00a3\u00a8', 'type': 'mutex'}
obs = stix2.parse_observable(mutex, version="2.1")
# CHANGED-parse_observable
obs = stix2.parse(mutex, version="2.1")
dd_idx = obs.id.index("--")
id_uuid = uuid.UUID(obs.id[dd_idx+2:])

View File

@ -508,7 +508,7 @@ def test_custom_observable_object_invalid_ref_property():
)
class NewObs():
pass
assert "is named like an object reference property but is not an ObjectReferenceProperty" in str(excinfo.value)
assert "is named like a reference property but is not a ReferenceProperty" in str(excinfo.value)
def test_custom_observable_object_invalid_refs_property():
@ -520,7 +520,7 @@ def test_custom_observable_object_invalid_refs_property():
)
class NewObs():
pass
assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value)
assert "is named like a reference list property but is not a ListProperty containing ReferenceProperty" in str(excinfo.value)
def test_custom_observable_object_invalid_refs_list_property():
@ -532,26 +532,26 @@ def test_custom_observable_object_invalid_refs_list_property():
)
class NewObs():
pass
assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value)
assert "is named like a reference list property but is not a ListProperty containing ReferenceProperty" in str(excinfo.value)
def test_custom_observable_object_invalid_valid_refs():
@stix2.v21.CustomObservable(
'x-new-obs', [
('property1', stix2.properties.StringProperty(required=True)),
('property_ref', stix2.properties.ObjectReferenceProperty(valid_types='email-addr')),
],
)
class NewObs():
pass
# def test_custom_observable_object_invalid_valid_refs():
# @stix2.v21.CustomObservable(
# 'x-new-obs', [
# ('property1', stix2.properties.StringProperty(required=True)),
# ('property_ref', stix2.properties.ReferenceProperty(valid_types='email-addr')),
# ],
# )
# class NewObs():
# pass
with pytest.raises(Exception) as excinfo:
NewObs(
_valid_refs=['1'],
property1='something',
property_ref='1',
)
assert "must be created with _valid_refs as a dict, not a list" in str(excinfo.value)
# with pytest.raises(Exception) as excinfo:
# NewObs(
# _valid_refs=['1'],
# property1='something',
# property_ref='1',
# )
# assert "must be created with _valid_refs as a dict, not a list" in str(excinfo.value)
def test_custom_no_properties_raises_exception():
@ -575,8 +575,8 @@ def test_parse_custom_observable_object():
"type": "x-new-observable",
"property1": "something"
}"""
nt = stix2.parse_observable(nt_string, [], version='2.1')
# CHANGED-parse_observable
nt = stix2.parse(nt_string, [], version='2.1')
assert isinstance(nt, stix2.base._STIXBase)
assert nt.property1 == 'something'
@ -587,11 +587,12 @@ def test_parse_unregistered_custom_observable_object():
"property1": "something"
}"""
with pytest.raises(stix2.exceptions.CustomContentError) as excinfo:
stix2.parse_observable(nt_string, version='2.1')
assert "Can't parse unknown observable type" in str(excinfo.value)
parsed_custom = stix2.parse_observable(nt_string, allow_custom=True, version='2.1')
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
# CHANGED-parse_observable
stix2.parse(nt_string, version='2.1')
assert "Can't parse unknown object type" in str(excinfo.value)
# CHANGED-parse_observable
parsed_custom = stix2.parse(nt_string, allow_custom=True, version='2.1')
assert parsed_custom['property1'] == 'something'
with pytest.raises(AttributeError) as excinfo:
assert parsed_custom.property1 == 'something'
@ -604,8 +605,9 @@ def test_parse_unregistered_custom_observable_object_with_no_type():
}"""
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse_observable(nt_string, allow_custom=True, version='2.1')
assert "Can't parse observable with no 'type' property" in str(excinfo.value)
# CHANGED-parse_observable
stix2.parse(nt_string, allow_custom=True, version='2.1')
assert "Can't parse object with no 'type' property" in str(excinfo.value)
def test_parse_observed_data_with_custom_observable():
@ -634,8 +636,9 @@ def test_parse_invalid_custom_observable_object():
}"""
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse_observable(nt_string, version='2.1')
assert "Can't parse observable with no 'type' property" in str(excinfo.value)
# CHANGED-parse_observable
stix2.parse(nt_string, version='2.1')
assert "Can't parse object with no 'type' property" in str(excinfo.value)
def test_observable_custom_property():
@ -885,8 +888,8 @@ def test_parse_observable_with_custom_extension():
}
}
}"""
parsed = stix2.parse_observable(input_str, version='2.1')
# CHANGED-parse_observable
parsed = stix2.parse(input_str, version='2.1')
assert parsed.extensions['x-new-ext'].property2 == 12
@ -961,10 +964,11 @@ def test_custom_and_spec_extension_mix():
)
def test_parse_observable_with_unregistered_custom_extension(data):
with pytest.raises(InvalidValueError) as excinfo:
stix2.parse_observable(data, version='2.1')
# CHANGED-parse_observable
stix2.parse(data, version='2.1')
assert "Can't parse unknown extension type" in str(excinfo.value)
parsed_ob = stix2.parse_observable(data, allow_custom=True, version='2.1')
# CHANGED-parse_observable
parsed_ob = stix2.parse(data, allow_custom=True, version='2.1')
assert parsed_ob['extensions']['x-foobar-ext']['property1'] == 'foo'
assert not isinstance(parsed_ob['extensions']['x-foobar-ext'], stix2.base._STIXBase)

View File

@ -209,7 +209,7 @@ def test_observed_data_example_with_bad_refs():
assert excinfo.value.cls == stix2.v21.Directory
assert excinfo.value.prop_name == "contains_refs"
assert "The type-specifying prefix 'monkey--' for this property is not valid" in excinfo.value.reason
assert "The type-specifying prefix 'monkey' for this property is not valid" in excinfo.value.reason
def test_observed_data_example_with_non_dictionary():
@ -369,7 +369,8 @@ def test_parse_autonomous_system_valid(data):
],
)
def test_parse_email_address(data):
odata = stix2.parse_observable(data, version='2.1')
# CHANGED-parse_observable
odata = stix2.parse(data, version='2.1')
assert odata.type == "email-addr"
odata_str = re.compile(
@ -378,7 +379,8 @@ def test_parse_email_address(data):
'"belongs_to_ref": "mutex--9be6365f-b89c-48c0-9340-6953f6595718"', data,
)
with pytest.raises(stix2.exceptions.InvalidValueError):
stix2.parse_observable(odata_str, version='2.1')
# CHANGED-parse_observable
stix2.parse(odata_str, version='2.1')
@pytest.mark.parametrize(
@ -424,7 +426,8 @@ def test_parse_email_address(data):
],
)
def test_parse_email_message(data):
odata = stix2.parse_observable(data, version='2.1')
# CHANGED-parse_observable
odata = stix2.parse(data, version='2.1')
assert odata.type == "email-message"
assert odata.body_multipart[0].content_disposition == "inline"
@ -446,7 +449,8 @@ def test_parse_email_message(data):
)
def test_parse_email_message_not_multipart(data):
with pytest.raises(stix2.exceptions.DependentPropertiesError) as excinfo:
stix2.parse_observable(data, version='2.1')
# CHANGED-parse_observable
stix2.parse(data, version='2.1')
assert excinfo.value.cls == stix2.v21.EmailMessage
assert excinfo.value.dependencies == [("is_multipart", "body")]
@ -548,7 +552,8 @@ def test_parse_file_archive(data):
)
def test_parse_email_message_with_at_least_one_error(data):
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.parse_observable(data, version='2.1')
# CHANGED-parse_observable
stix2.parse(data, version='2.1')
assert excinfo.value.cls == stix2.v21.EmailMessage
assert "At least one of the" in str(excinfo.value)
@ -570,7 +575,8 @@ def test_parse_email_message_with_at_least_one_error(data):
],
)
def test_parse_basic_tcp_traffic(data):
odata = stix2.parse_observable(
# CHANGED-parse_observable
odata = stix2.parse(
data, version='2.1',
)
@ -602,7 +608,8 @@ def test_parse_basic_tcp_traffic(data):
)
def test_parse_basic_tcp_traffic_with_error(data):
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
stix2.parse_observable(data, version='2.1')
# CHANGED-parse_observable
stix2.parse(data, version='2.1')
assert excinfo.value.cls == stix2.v21.NetworkTraffic
assert excinfo.value.properties == ["dst_ref", "src_ref"]

View File

@ -233,7 +233,7 @@ class Report(STIXDomainObject):
('name', StringProperty(required=True)),
('description', StringProperty()),
('published', TimestampProperty(required=True)),
('object_refs', ListProperty(ReferenceProperty(invalid_types=[""], spec_version='2.0'), required=True)),
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SDO", "SRO"], spec_version='2.0'), required=True)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty, required=True)),
('external_references', ListProperty(ExternalReference)),

View File

@ -67,7 +67,7 @@ class Sighting(STIXRelationshipObject):
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
('count', IntegerProperty(min=0, max=999999999)),
('sighting_of_ref', ReferenceProperty(valid_types="only_SDO", spec_version='2.0', required=True)),
('sighting_of_ref', ReferenceProperty(valid_types="SDO", spec_version='2.0', required=True)),
('observed_data_refs', ListProperty(ReferenceProperty(valid_types='observed-data', spec_version='2.0'))),
('where_sighted_refs', ListProperty(ReferenceProperty(valid_types='identity', spec_version='2.0'))),
('summary', BooleanProperty(default=lambda: False)),

View File

@ -76,7 +76,7 @@ class LanguageContent(_STIXBase):
('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.1')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('object_ref', ReferenceProperty(invalid_types=[""], spec_version='2.1', required=True)),
('object_ref', ReferenceProperty(valid_types=["SCO", "SDO", "SRO"], spec_version='2.1', required=True)),
# TODO: 'object_modified' it MUST be an exact match for the modified time of the STIX Object (SRO or SDO) being referenced.
('object_modified', TimestampProperty(precision='millisecond')),
# TODO: 'contents' https://docs.google.com/document/d/1ShNq4c3e1CkfANmD9O--mdZ5H0O_GLnjN28a_yrEaco/edit#heading=h.cfz5hcantmvx

View File

@ -385,7 +385,7 @@ class File(_Observable):
('mtime', TimestampProperty()),
('atime', TimestampProperty()),
('parent_directory_ref', ReferenceProperty(valid_types='directory', spec_version='2.1')),
('contains_refs', ListProperty(ReferenceProperty(invalid_types=[""], spec_version='2.1'))),
('contains_refs', ListProperty(ReferenceProperty(valid_types=["SCO"], spec_version='2.1'))),
('content_ref', ReferenceProperty(valid_types='artifact', spec_version='2.1')),
('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=_type)),
('spec_version', StringProperty(fixed='2.1')),
@ -1028,6 +1028,7 @@ def CustomObservable(type='x-custom-observable', properties=None):
def wrapper(cls):
_properties = list(itertools.chain.from_iterable([
[('type', TypeProperty(type))],
[('id', IDProperty(type, spec_version='2.1'))],
properties,
[('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=type))],
]))

View File

@ -149,7 +149,7 @@ class Grouping(STIXDomainObject):
('name', StringProperty()),
('description', StringProperty()),
('context', StringProperty(required=True)),
('object_refs', ListProperty(ReferenceProperty(invalid_types=[""], spec_version='2.1'), required=True)),
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SDO", "SRO"], spec_version='2.1'), required=True)),
])
@ -512,7 +512,7 @@ class MalwareAnalysis(STIXDomainObject):
('analysis_started', TimestampProperty()),
('analysis_ended', TimestampProperty()),
('av_result', StringProperty()),
('analysis_sco_refs', ListProperty(ReferenceProperty(valid_types="only_SCO", spec_version='2.1'))),
('analysis_sco_refs', ListProperty(ReferenceProperty(valid_types="SCO", spec_version='2.1'))),
])
def _check_object_constraints(self):
@ -538,7 +538,7 @@ class Note(STIXDomainObject):
('abstract', StringProperty()),
('content', StringProperty(required=True)),
('authors', ListProperty(StringProperty)),
('object_refs', ListProperty(ReferenceProperty(invalid_types=[""], spec_version='2.1'), required=True)),
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SDO", "SRO"], spec_version='2.1'), required=True)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('confidence', IntegerProperty()),
@ -567,7 +567,7 @@ class ObservedData(STIXDomainObject):
('last_observed', TimestampProperty(required=True)),
('number_observed', IntegerProperty(min=1, max=999999999, required=True)),
('objects', ObservableProperty(spec_version='2.1')),
('object_refs', ListProperty(ReferenceProperty(valid_types="only_SCO_&_SRO", spec_version="2.1"))),
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SRO"], spec_version="2.1"))),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('confidence', IntegerProperty()),
@ -632,7 +632,7 @@ class Opinion(STIXDomainObject):
], required=True,
),
),
('object_refs', ListProperty(ReferenceProperty(invalid_types=[""], spec_version='2.1'), required=True)),
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SDO", "SRO"], spec_version='2.1'), required=True)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('confidence', IntegerProperty()),
@ -661,7 +661,7 @@ class Report(STIXDomainObject):
('description', StringProperty()),
('report_types', ListProperty(StringProperty, required=True)),
('published', TimestampProperty(required=True)),
('object_refs', ListProperty(ReferenceProperty(invalid_types=[""], spec_version='2.1'), required=True)),
('object_refs', ListProperty(ReferenceProperty(valid_types=["SCO", "SDO", "SRO"], spec_version='2.1'), required=True)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('confidence', IntegerProperty()),

View File

@ -86,7 +86,7 @@ class Sighting(STIXRelationshipObject):
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
('count', IntegerProperty(min=0, max=999999999)),
('sighting_of_ref', ReferenceProperty(valid_types="only_SDO", spec_version='2.1', required=True)),
('sighting_of_ref', ReferenceProperty(valid_types="SDO", spec_version='2.1', required=True)),
('observed_data_refs', ListProperty(ReferenceProperty(valid_types='observed-data', spec_version='2.1'))),
('where_sighted_refs', ListProperty(ReferenceProperty(valid_types='identity', spec_version='2.1'))),
('summary', BooleanProperty()),