Add new constrains parameters to IntegerProperty and FloatProperty

New constraints on timestamps, integer and floats for many objects
stix2.1
Emmanuelle Vargas-Gonzalez 2018-10-15 15:02:59 -04:00
parent acd86c80dd
commit dec75082df
5 changed files with 158 additions and 67 deletions

View File

@ -143,7 +143,7 @@ class ListProperty(Property):
if type(self.contained) is EmbeddedObjectProperty:
obj_type = self.contained.type
elif type(self.contained).__name__ is 'STIXObjectProperty':
elif type(self.contained).__name__ is "STIXObjectProperty":
# ^ this way of checking doesn't require a circular import
# valid is already an instance of a python-stix2 class; no need
# to turn it into a dictionary and then pass it to the class
@ -202,21 +202,51 @@ class IDProperty(Property):
class IntegerProperty(Property):
def __init__(self, min=None, max=None, **kwargs):
self.min = min
self.max = max
super(IntegerProperty, self).__init__(**kwargs)
def clean(self, value):
try:
return int(value)
value = int(value)
except Exception:
raise ValueError("must be an integer.")
if self.min is not None and value < self.min:
msg = "minimum value is {}. received {}".format(self.min, value)
raise ValueError(msg)
if self.max is not None and value > self.max:
msg = "maximum value is {}. received {}".format(self.max, value)
raise ValueError(msg)
return value
class FloatProperty(Property):
def __init__(self, min=None, max=None, **kwargs):
self.min = min
self.max = max
super(FloatProperty, self).__init__(**kwargs)
def clean(self, value):
try:
return float(value)
value = float(value)
except Exception:
raise ValueError("must be a float.")
if self.min is not None and value < self.min:
msg = "minimum value is {}. received {}".format(self.min, value)
raise ValueError(msg)
if self.max is not None and value > self.max:
msg = "maximum value is {}. received {}".format(self.max, value)
raise ValueError(msg)
return value
class BooleanProperty(Property):
@ -272,7 +302,7 @@ class DictionaryProperty(Property):
elif self.spec_version == '2.1':
if len(k) > 250:
raise DictionaryKeyError(k, "longer than 250 characters")
if not re.match('^[a-zA-Z0-9_-]+$', k):
if not re.match("^[a-zA-Z0-9_-]+$", k):
msg = (
"contains characters other than lowercase a-z, "
"uppercase A-Z, numerals 0-9, hyphen (-), or "
@ -329,7 +359,7 @@ class BinaryProperty(Property):
class HexProperty(Property):
def clean(self, value):
if not re.match('^([a-fA-F0-9]{2})+$', value):
if not re.match("^([a-fA-F0-9]{2})+$", value):
raise ValueError("must contain an even number of hexadecimal characters")
return value

View File

@ -7,8 +7,8 @@ from ..custom import _custom_marking_builder
from ..markings import _MarkingsMixin
from ..properties import (
BooleanProperty, DictionaryProperty, HashesProperty, IDProperty,
ListProperty, Property, ReferenceProperty, SelectorProperty,
StringProperty, TimestampProperty, TypeProperty,
IntegerProperty, ListProperty, Property, ReferenceProperty,
SelectorProperty, StringProperty, TimestampProperty, TypeProperty,
)
from ..utils import NOW, _get_dict
@ -82,6 +82,7 @@ class LanguageContent(_STIXBase):
('contents', DictionaryProperty(spec_version='2.1', required=True)),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('confidence', IntegerProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type='marking-definition'))),
('granular_markings', ListProperty(GranularMarking)),

View File

@ -171,7 +171,6 @@ class ArchiveExt(_Extension):
_type = 'archive-ext'
_properties = OrderedDict([
('contains_refs', ListProperty(ObjectReferenceProperty(valid_types='file'), required=True)),
('version', StringProperty()),
('comment', StringProperty()),
])
@ -229,7 +228,6 @@ class RasterImageExt(_Extension):
('image_height', IntegerProperty()),
('image_width', IntegerProperty()),
('bits_per_pixel', IntegerProperty()),
('image_compression_algorithm', StringProperty()),
('exif_tags', DictionaryProperty(spec_version='2.1')),
])
@ -244,9 +242,9 @@ class WindowsPEOptionalHeaderType(_STIXBase):
('magic_hex', HexProperty()),
('major_linker_version', IntegerProperty()),
('minor_linker_version', IntegerProperty()),
('size_of_code', IntegerProperty()),
('size_of_initialized_data', IntegerProperty()),
('size_of_uninitialized_data', IntegerProperty()),
('size_of_code', IntegerProperty(min=0)),
('size_of_initialized_data', IntegerProperty(min=0)),
('size_of_uninitialized_data', IntegerProperty(min=0)),
('address_of_entry_point', IntegerProperty()),
('base_of_code', IntegerProperty()),
('base_of_data', IntegerProperty()),
@ -260,13 +258,13 @@ class WindowsPEOptionalHeaderType(_STIXBase):
('major_subsystem_version', IntegerProperty()),
('minor_subsystem_version', IntegerProperty()),
('win32_version_value_hex', HexProperty()),
('size_of_image', IntegerProperty()),
('size_of_headers', IntegerProperty()),
('size_of_image', IntegerProperty(min=0)),
('size_of_headers', IntegerProperty(min=0)),
('checksum_hex', HexProperty()),
('subsystem_hex', HexProperty()),
('dll_characteristics_hex', HexProperty()),
('size_of_stack_reserve', IntegerProperty()),
('size_of_stack_commit', IntegerProperty()),
('size_of_stack_reserve', IntegerProperty(min=0)),
('size_of_stack_commit', IntegerProperty(min=0)),
('size_of_heap_reserve', IntegerProperty()),
('size_of_heap_commit', IntegerProperty()),
('loader_flags_hex', HexProperty()),
@ -287,7 +285,7 @@ class WindowsPESection(_STIXBase):
_properties = OrderedDict([
('name', StringProperty(required=True)),
('size', IntegerProperty()),
('size', IntegerProperty(min=0)),
('entropy', FloatProperty()),
('hashes', HashesProperty(spec_version='2.1')),
])
@ -304,11 +302,11 @@ class WindowsPEBinaryExt(_Extension):
('pe_type', StringProperty(required=True)), # open_vocab
('imphash', StringProperty()),
('machine_hex', HexProperty()),
('number_of_sections', IntegerProperty()),
('number_of_sections', IntegerProperty(min=0)),
('time_date_stamp', TimestampProperty(precision='second')),
('pointer_to_symbol_table_hex', HexProperty()),
('number_of_symbols', IntegerProperty()),
('size_of_optional_header', IntegerProperty()),
('number_of_symbols', IntegerProperty(min=0)),
('size_of_optional_header', IntegerProperty(min=0)),
('characteristics_hex', HexProperty()),
('file_header_hashes', HashesProperty(spec_version='2.1')),
('optional_header', EmbeddedObjectProperty(type=WindowsPEOptionalHeaderType)),
@ -326,7 +324,7 @@ class File(_Observable):
_properties = OrderedDict([
('type', TypeProperty(_type)),
('hashes', HashesProperty(spec_version='2.1')),
('size', IntegerProperty()),
('size', IntegerProperty(min=0)),
('name', StringProperty()),
('name_enc', StringProperty()),
('magic_number_hex', HexProperty()),
@ -480,7 +478,7 @@ class SocketExt(_Extension):
"SOCK_SEQPACKET",
]),
),
('socket_descriptor', IntegerProperty()),
('socket_descriptor', IntegerProperty(min=0)),
('socket_handle', IntegerProperty()),
])
@ -512,13 +510,13 @@ class NetworkTraffic(_Observable):
('is_active', BooleanProperty()),
('src_ref', ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name'])),
('dst_ref', ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name'])),
('src_port', IntegerProperty()),
('dst_port', IntegerProperty()),
('src_port', IntegerProperty(min=0, max=65535)),
('dst_port', IntegerProperty(min=0, max=65535)),
('protocols', ListProperty(StringProperty, required=True)),
('src_byte_count', IntegerProperty()),
('dst_byte_count', IntegerProperty()),
('src_packets', IntegerProperty()),
('dst_packets', IntegerProperty()),
('src_byte_count', IntegerProperty(min=0)),
('dst_byte_count', IntegerProperty(min=0)),
('src_packets', IntegerProperty(min=0)),
('dst_packets', IntegerProperty(min=0)),
('ipfix', DictionaryProperty(spec_version='2.1')),
('src_payload_ref', ObjectReferenceProperty(valid_types='artifact')),
('dst_payload_ref', ObjectReferenceProperty(valid_types='artifact')),
@ -531,6 +529,22 @@ class NetworkTraffic(_Observable):
super(NetworkTraffic, self)._check_object_constraints()
self._check_at_least_one_property(['src_ref', 'dst_ref'])
start = self.get('start')
end = self.get('end')
is_active = self.get('is_active')
if end and is_active is not False:
msg = "{0.id} 'is_active' must be False if 'end' is present"
raise ValueError(msg.format(self))
if end and is_active is True:
msg = "{0.id} if 'is_active' is True, 'end' must not be included"
raise ValueError(msg.format(self))
if start and end and end <= start:
msg = "{0.id} 'end' must be greater than 'start'"
raise ValueError(msg.format(self))
class WindowsProcessExt(_Extension):
# TODO: Add link
@ -546,6 +560,14 @@ class WindowsProcessExt(_Extension):
('owner_sid', StringProperty()),
('window_title', StringProperty()),
('startup_info', DictionaryProperty(spec_version='2.1')),
(
'integrity_level', EnumProperty(allowed=[
"low",
"medium",
"high",
"system",
])
)
])
@ -604,11 +626,9 @@ class Process(_Observable):
('type', TypeProperty(_type)),
('is_hidden', BooleanProperty()),
('pid', IntegerProperty()),
('name', StringProperty()),
# this is not the created timestamps of the object itself
('created', TimestampProperty()),
('cwd', StringProperty()),
('arguments', ListProperty(StringProperty)),
('command_line', StringProperty()),
('environment_variables', DictionaryProperty(spec_version='2.1')),
('opened_connection_refs', ListProperty(ObjectReferenceProperty(valid_types='network-traffic'))),

View File

@ -2,7 +2,6 @@
from collections import OrderedDict
import itertools
from math import fabs
from ..core import STIXDomainObject
from ..custom import _custom_object_builder
@ -71,6 +70,16 @@ class Campaign(STIXDomainObject):
('granular_markings', ListProperty(GranularMarking)),
])
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
first_seen = self.get('first_seen')
last_seen = self.get('last_seen')
if first_seen and last_seen and last_seen < first_seen:
msg = "{0.id} 'last_seen' must be greater than or equal 'first_seen'"
raise ValueError(msg.format(self))
class CourseOfAction(STIXDomainObject):
# TODO: Add link
@ -114,6 +123,7 @@ class Identity(STIXDomainObject):
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('roles', ListProperty(StringProperty)),
('identity_class', StringProperty(required=True)),
('sectors', ListProperty(StringProperty)),
('contact_information', StringProperty()),
@ -142,8 +152,8 @@ class Indicator(STIXDomainObject):
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty()),
('indicator_types', ListProperty(StringProperty, required=True)),
('description', StringProperty()),
('indicator_types', ListProperty(StringProperty, required=True)),
('pattern', PatternProperty(required=True)),
('valid_from', TimestampProperty(default=lambda: NOW)),
('valid_until', TimestampProperty()),
@ -157,6 +167,16 @@ class Indicator(STIXDomainObject):
('granular_markings', ListProperty(GranularMarking)),
])
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
valid_from = self.get('valid_from')
valid_until = self.get('valid_until')
if valid_from and valid_until and valid_until <= valid_from:
msg = "{0.id} 'valid_until' must be greater than 'valid_from'"
raise ValueError(msg.format(self))
class IntrusionSet(STIXDomainObject):
# TODO: Add link
@ -190,6 +210,16 @@ class IntrusionSet(STIXDomainObject):
('granular_markings', ListProperty(GranularMarking)),
])
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
first_seen = self.get('first_seen')
last_seen = self.get('last_seen')
if first_seen and last_seen and last_seen < first_seen:
msg = "{0.id} 'last_seen' must be greater than or equal to 'first_seen'"
raise ValueError(msg.format(self))
class Location(STIXDomainObject):
# TODO: Add link
@ -206,9 +236,9 @@ class Location(STIXDomainObject):
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('description', StringProperty()),
('latitude', FloatProperty()),
('longitude', FloatProperty()),
('precision', FloatProperty()),
('latitude', FloatProperty(min=-90.0, max=90.0)),
('longitude', FloatProperty(min=-180.0, max=180.0)),
('precision', FloatProperty(min=0.0)),
('region', StringProperty()),
('country', StringProperty()),
('administrative_area', StringProperty()),
@ -225,31 +255,13 @@ class Location(STIXDomainObject):
])
def _check_object_constraints(self):
super(Location, self)._check_object_constraints()
super(self.__class__, self)._check_object_constraints()
if self.get('precision') is not None:
self._check_properties_dependency(['longitude', 'latitude'], ['precision'])
if self.precision < 0.0:
msg = (
"{0.id} 'precision' must be a positive value. Received "
"{0.precision}"
)
raise ValueError(msg.format(self))
self._check_properties_dependency(['latitude'], ['longitude'])
if self.get('latitude') is not None and fabs(self.latitude) > 90.0:
msg = (
"{0.id} 'latitude' must be between -90 and 90. Received "
"{0.latitude}"
)
raise ValueError(msg.format(self))
if self.get('longitude') is not None and fabs(self.longitude) > 180.0:
msg = (
"{0.id} 'longitude' must be between -180 and 180. Received "
"{0.longitude}"
)
raise ValueError(msg.format(self))
self._check_properties_dependency(['longitude'], ['latitude'])
class Malware(STIXDomainObject):
@ -267,8 +279,8 @@ class Malware(STIXDomainObject):
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('malware_types', ListProperty(StringProperty, required=True)),
('description', StringProperty()),
('malware_types', ListProperty(StringProperty, required=True)),
('kill_chain_phases', ListProperty(KillChainPhase)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
@ -294,8 +306,8 @@ class Note(STIXDomainObject):
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('summary', StringProperty()),
('description', StringProperty(required=True)),
('abstract', StringProperty()),
('content', StringProperty(required=True)),
('authors', ListProperty(StringProperty)),
('object_refs', ListProperty(ReferenceProperty, required=True)),
('revoked', BooleanProperty(default=lambda: False)),
@ -324,7 +336,7 @@ class ObservedData(STIXDomainObject):
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('first_observed', TimestampProperty(required=True)),
('last_observed', TimestampProperty(required=True)),
('number_observed', IntegerProperty(required=True)),
('number_observed', IntegerProperty(min=1, max=999999999, required=True)),
('objects', ObservableProperty(spec_version='2.1', required=True)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
@ -341,6 +353,20 @@ class ObservedData(STIXDomainObject):
super(ObservedData, self).__init__(*args, **kwargs)
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
if self.get('number_observed', 1) == 1:
self._check_properties_dependency(['first_observed'], ['last_observed'])
self._check_properties_dependency(['last_observed'], ['first_observed'])
first_observed = self.get('first_observed')
last_observed = self.get('last_observed')
if first_observed and last_observed and last_observed < first_observed:
msg = "{0.id} 'last_observed' must be greater than or equal to 'first_observed'"
raise ValueError(msg.format(self))
class Opinion(STIXDomainObject):
# TODO: Add link
@ -356,7 +382,7 @@ class Opinion(STIXDomainObject):
('created_by_ref', ReferenceProperty(type='identity')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('description', StringProperty()),
('explanation', StringProperty()),
('authors', ListProperty(StringProperty)),
('object_refs', ListProperty(ReferenceProperty, required=True)),
(
@ -395,8 +421,8 @@ class Report(STIXDomainObject):
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('report_types', ListProperty(StringProperty, required=True)),
('description', StringProperty()),
('report_types', ListProperty(StringProperty, required=True)),
('published', TimestampProperty(required=True)),
('object_refs', ListProperty(ReferenceProperty, required=True)),
('revoked', BooleanProperty(default=lambda: False)),
@ -424,8 +450,8 @@ class ThreatActor(STIXDomainObject):
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('threat_actor_types', ListProperty(StringProperty, required=True)),
('description', StringProperty()),
('threat_actor_types', ListProperty(StringProperty, required=True)),
('aliases', ListProperty(StringProperty)),
('roles', ListProperty(StringProperty)),
('goals', ListProperty(StringProperty)),
@ -459,8 +485,8 @@ class Tool(STIXDomainObject):
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('tool_types', ListProperty(StringProperty, required=True)),
('description', StringProperty()),
('tool_types', ListProperty(StringProperty, required=True)),
('kill_chain_phases', ListProperty(KillChainPhase)),
('tool_version', StringProperty()),
('revoked', BooleanProperty(default=lambda: False)),

View File

@ -56,8 +56,12 @@ class Relationship(STIXRelationshipObject):
super(Relationship, self).__init__(**kwargs)
def _check_object_constraints(self):
super(Relationship, self)._check_object_constraints()
if self.get('start_time') and self.get('stop_time') and (self.start_time > self.stop_time):
super(self.__class__, self)._check_object_constraints()
start_time = self.get('start_time')
stop_time = self.get('stop_time')
if start_time and stop_time and stop_time <= start_time:
msg = "{0.id} 'stop_time' must be later than 'start_time'"
raise ValueError(msg.format(self))
@ -78,7 +82,7 @@ class Sighting(STIXRelationshipObject):
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
('count', IntegerProperty()),
('count', IntegerProperty(min=0, max=999999999)),
('sighting_of_ref', ReferenceProperty(required=True)),
('observed_data_refs', ListProperty(ReferenceProperty(type='observed-data'))),
('where_sighted_refs', ListProperty(ReferenceProperty(type='identity'))),
@ -99,3 +103,13 @@ class Sighting(STIXRelationshipObject):
kwargs['sighting_of_ref'] = sighting_of_ref
super(Sighting, self).__init__(**kwargs)
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
first_seen = self.get('first_seen')
last_seen = self.get('last_seen')
if first_seen and last_seen and last_seen <= first_seen:
msg = "{0.id} 'last_seen' must be later than 'first_seen'"
raise ValueError(msg.format(self))