Merge remote-tracking branch 'oasis/master' into datastores

stix2.1
Emmanuelle Vargas-Gonzalez 2017-09-01 10:13:57 -04:00
commit 6f36ea8488
13 changed files with 820 additions and 425 deletions

View File

@ -216,18 +216,14 @@ class _Observable(_STIXBase):
try:
allowed_types = prop.contained.valid_types
except AttributeError:
try:
allowed_types = prop.valid_types
except AttributeError:
raise ValueError("'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty or a ListProperty "
"containing ObjectReferenceProperty." % prop_name)
allowed_types = prop.valid_types
try:
ref_type = self._STIXBase__valid_refs[ref]
except TypeError:
raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__)
if allowed_types:
try:
ref_type = self._STIXBase__valid_refs[ref]
except TypeError:
raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__)
if ref_type not in allowed_types:
raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type))

View File

@ -29,16 +29,13 @@ class ObservableProperty(Property):
except ValueError:
raise ValueError("The observable property must contain a dictionary")
if dictified == {}:
raise ValueError("The dictionary property must contain a non-empty dictionary")
raise ValueError("The observable property must contain a non-empty dictionary")
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
# from .__init__ import parse_observable # avoid circular import
for key, obj in dictified.items():
parsed_obj = parse_observable(obj, valid_refs)
if not issubclass(type(parsed_obj), _Observable):
raise ValueError("Objects in an observable property must be "
"Cyber Observable Objects")
dictified[key] = parsed_obj
return dictified
@ -58,7 +55,7 @@ class ExtensionsProperty(DictionaryProperty):
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
if dictified == {}:
raise ValueError("The dictionary property must contain a non-empty dictionary")
raise ValueError("The extensions property must contain a non-empty dictionary")
if self.enclosing_type in EXT_MAP:
specific_type_map = EXT_MAP[self.enclosing_type]
@ -74,7 +71,7 @@ class ExtensionsProperty(DictionaryProperty):
else:
raise ValueError("The key used in the extensions dictionary is not an extension type name")
else:
raise ValueError("The enclosing type has no extensions defined")
raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type)
return dictified
@ -87,6 +84,7 @@ class Artifact(_Observable):
('payload_bin', BinaryProperty()),
('url', StringProperty()),
('hashes', HashesProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
def _check_object_constraints(self):
@ -103,6 +101,7 @@ class AutonomousSystem(_Observable):
('number', IntegerProperty()),
('name', StringProperty()),
('rir', StringProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@ -118,6 +117,7 @@ class Directory(_Observable):
('modified', TimestampProperty()),
('accessed', TimestampProperty()),
('contains_refs', ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory']))),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@ -128,6 +128,7 @@ class DomainName(_Observable):
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@ -139,6 +140,8 @@ class EmailAddress(_Observable):
('value', StringProperty(required=True)),
('display_name', StringProperty()),
('belongs_to_ref', ObjectReferenceProperty(valid_types='user-account')),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@ -175,6 +178,7 @@ class EmailMessage(_Observable):
('body', StringProperty()),
('body_multipart', ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent))),
('raw_email_ref', ObjectReferenceProperty(valid_types='artifact')),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
def _check_object_constraints(self):
@ -186,6 +190,7 @@ class EmailMessage(_Observable):
class ArchiveExt(_Extension):
_type = 'archive-ext'
_properties = OrderedDict()
_properties.update([
('contains_refs', ListProperty(ObjectReferenceProperty(valid_types='file'), required=True)),
@ -204,6 +209,7 @@ class AlternateDataStream(_STIXBase):
class NTFSExt(_Extension):
_type = 'ntfs-ext'
_properties = OrderedDict()
_properties.update([
('sid', StringProperty()),
@ -212,6 +218,7 @@ class NTFSExt(_Extension):
class PDFExt(_Extension):
_type = 'pdf-ext'
_properties = OrderedDict()
_properties.update([
('version', StringProperty()),
@ -223,6 +230,7 @@ class PDFExt(_Extension):
class RasterImageExt(_Extension):
_type = 'raster-image-ext'
_properties = OrderedDict()
_properties.update([
('image_height', IntegerProperty()),
@ -285,6 +293,7 @@ class WindowsPESection(_STIXBase):
class WindowsPEBinaryExt(_Extension):
_type = 'windows-pebinary-ext'
_properties = OrderedDict()
_properties.update([
('pe_type', StringProperty(required=True)), # open_vocab
@ -340,6 +349,7 @@ class IPv4Address(_Observable):
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@ -351,6 +361,7 @@ class IPv6Address(_Observable):
('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
('belongs_to_refs', ListProperty(ObjectReferenceProperty(valid_types='autonomous-system'))),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@ -360,6 +371,7 @@ class MACAddress(_Observable):
_properties.update([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@ -369,10 +381,12 @@ class Mutex(_Observable):
_properties.update([
('type', TypeProperty(_type)),
('name', StringProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class HTTPRequestExt(_Extension):
_type = 'http-request-ext'
_properties = OrderedDict()
_properties.update([
('request_method', StringProperty(required=True)),
@ -385,6 +399,7 @@ class HTTPRequestExt(_Extension):
class ICMPExt(_Extension):
_type = 'icmp-ext'
_properties = OrderedDict()
_properties.update([
('icmp_type_hex', HexProperty(required=True)),
@ -393,6 +408,7 @@ class ICMPExt(_Extension):
class SocketExt(_Extension):
_type = 'socket-ext'
_properties = OrderedDict()
_properties.update([
('address_family', EnumProperty([
@ -427,6 +443,7 @@ class SocketExt(_Extension):
class TCPExt(_Extension):
_type = 'tcp-ext'
_properties = OrderedDict()
_properties.update([
('src_flags_hex', HexProperty()),
@ -465,6 +482,7 @@ class NetworkTraffic(_Observable):
class WindowsProcessExt(_Extension):
_type = 'windows-process-ext'
_properties = OrderedDict()
_properties.update([
('aslr_enabled', BooleanProperty()),
@ -477,6 +495,7 @@ class WindowsProcessExt(_Extension):
class WindowsServiceExt(_Extension):
_type = 'windows-service-ext'
_properties = OrderedDict()
_properties.update([
('service_name', StringProperty(required=True)),
@ -556,6 +575,7 @@ class Software(_Observable):
('languages', ListProperty(StringProperty)),
('vendor', StringProperty()),
('version', StringProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@ -565,10 +585,12 @@ class URL(_Observable):
_properties.update([
('type', TypeProperty(_type)),
('value', StringProperty(required=True)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
class UNIXAccountExt(_Extension):
_type = 'unix-account-ext'
_properties = OrderedDict()
_properties.update([
('gid', IntegerProperty()),
@ -635,6 +657,7 @@ class WindowsRegistryKey(_Observable):
('modified', TimestampProperty()),
('creator_user_ref', ObjectReferenceProperty(valid_types='user-account')),
('number_of_subkeys', IntegerProperty()),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@property
@ -684,6 +707,7 @@ class X509Certificate(_Observable):
('subject_public_key_modulus', StringProperty()),
('subject_public_key_exponent', IntegerProperty()),
('x509_v3_extensions', EmbeddedObjectProperty(type=X509V3ExtenstionsType)),
('extensions', ExtensionsProperty(enclosing_type=_type)),
])
@ -708,39 +732,32 @@ OBJ_MAP_OBSERVABLE = {
'x509-certificate': X509Certificate,
}
EXT_MAP_FILE = {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
}
EXT_MAP_NETWORK_TRAFFIC = {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
}
EXT_MAP_PROCESS = {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
}
EXT_MAP_USER_ACCOUNT = {
'unix-account-ext': UNIXAccountExt,
}
EXT_MAP = {
'file': EXT_MAP_FILE,
'network-traffic': EXT_MAP_NETWORK_TRAFFIC,
'process': EXT_MAP_PROCESS,
'user-account': EXT_MAP_USER_ACCOUNT,
'file': {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
},
'network-traffic': {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
},
'process': {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
},
'user-account': {
'unix-account-ext': UNIXAccountExt,
},
}
def parse_observable(data, _valid_refs, allow_custom=False):
def parse_observable(data, _valid_refs=None, allow_custom=False):
"""Deserialize a string or file-like object into a STIX Cyber Observable
object.
@ -756,19 +773,20 @@ def parse_observable(data, _valid_refs, allow_custom=False):
"""
obj = get_dict(data)
obj['_valid_refs'] = _valid_refs
obj['_valid_refs'] = _valid_refs or []
if 'type' not in obj:
raise ParseError("Can't parse object with no 'type' property: %s" % str(obj))
raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj))
try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
raise ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type'])
raise ParseError("Can't parse unknown observable type '%s'! For custom observables, "
"use the CustomObservable decorator." % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
if name not in EXT_MAP[obj['type']]:
raise ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type']))
ext_class = EXT_MAP[obj['type']][name]
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
@ -807,6 +825,16 @@ def CustomObservable(type='x-custom-observable', properties=None):
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
# Check properties ending in "_ref/s" are ObjectReferenceProperties
for prop_name, prop in properties:
if prop_name.endswith('_ref') and not isinstance(prop, ObjectReferenceProperty):
raise ValueError("'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name)
elif (prop_name.endswith('_refs') and (not isinstance(prop, ListProperty)
or not isinstance(prop.contained, ObjectReferenceProperty))):
raise ValueError("'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name)
_properties.update(properties)
def __init__(self, **kwargs):
@ -817,3 +845,50 @@ def CustomObservable(type='x-custom-observable', properties=None):
return _Custom
return custom_builder
def _register_extension(observable, new_extension):
"""Register a custom extension to a STIX Cyber Observable type.
"""
try:
observable_type = observable._type
except AttributeError:
raise ValueError("Unknown observable type. Custom observables must be "
"created with the @CustomObservable decorator.")
try:
EXT_MAP[observable_type][new_extension._type] = new_extension
except KeyError:
if observable_type not in OBJ_MAP_OBSERVABLE:
raise ValueError("Unknown observable type '%s'. Custom observables "
"must be created with the @CustomObservable decorator."
% observable_type)
else:
EXT_MAP[observable_type] = {new_extension._type: new_extension}
def CustomExtension(observable=None, type='x-custom-observable', properties={}):
"""Decorator for custom extensions to STIX Cyber Observables
"""
if not observable or not issubclass(observable, _Observable):
raise ValueError("'observable' must be a valid Observable class!")
def custom_builder(cls):
class _Custom(cls, _Extension):
_type = type
_properties = {
'extensions': ExtensionsProperty(enclosing_type=_type),
}
_properties.update(properties)
def __init__(self, **kwargs):
_Extension.__init__(self, **kwargs)
cls.__init__(self, **kwargs)
_register_extension(observable, _Custom)
return _Custom
return custom_builder

View File

@ -16,55 +16,18 @@ Notes:
"""
import collections
import uuid
from six import iteritems
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
__slots__ = ()
def __new__(cls, field, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
self = super(Filter, cls).__new__(cls, field, op, value)
return self
from stix2.sources.filters import (FILTER_OPS, FILTER_VALUE_TYPES,
STIX_COMMON_FIELDS, STIX_COMMON_FILTERS_MAP)
def make_id():
return str(uuid.uuid4())
# Currently, only STIX 2.0 common SDO fields (that are not complex objects)
# are supported for filtering on
STIX_COMMON_FIELDS = [
"created",
"created_by_ref",
"external_references.source_name",
"external_references.description",
"external_references.url",
"external_references.hashes",
"external_references.external_id",
"granular_markings.marking_ref",
"granular_markings.selectors",
"id",
"labels",
"modified",
"object_marking_refs",
"revoked",
"type",
"granular_markings"
]
# Supported filter operations
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
# Supported filter value types
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
class DataStore(object):
"""
An implementer will create a concrete subclass from
@ -95,7 +58,7 @@ class DataStore(object):
stix_obj (dictionary): the STIX object to be returned
"""
return self.source.get(stix_id=stix_id)
return self.source.get(stix_id)
def all_versions(self, stix_id):
"""
@ -112,7 +75,7 @@ class DataStore(object):
STIX object)
"""
return self.source.all_versions(stix_id=stix_id)
return self.source.all_versions(stix_id)
def query(self, query):
"""
@ -137,7 +100,7 @@ class DataStore(object):
Translate add() to the appropriate DataSink call().
"""
return self.sink.add(stix_objs=stix_objs)
return self.sink.add(stix_objs)
class DataSink(object):
@ -281,35 +244,32 @@ class DataSource(object):
# evaluate objects against filter
for stix_obj in stix_objs:
clean = True
for filter in query:
try:
# skip filter as filter was identified (when added) as
# not a common filter
if filter.field not in STIX_COMMON_FIELDS:
raise Exception("Error, field: {0} is not supported for filtering on.".format(filter.field))
for filter_ in query:
# skip filter as filter was identified (when added) as
# not a common filter
if filter_.field not in STIX_COMMON_FIELDS:
raise ValueError("Error, field: {0} is not supported for filtering on.".format(filter_.field))
# For properties like granular_markings and external_references
# need to break the first property from the string.
if "." in filter.field:
field = filter.field.split(".")[0]
else:
field = filter.field
# For properties like granular_markings and external_references
# need to break the first property from the string.
if "." in filter_.field:
field = filter_.field.split(".")[0]
else:
field = filter_.field
# check filter "field" is in STIX object - if cant be
# applied due to STIX object, STIX object is discarded
# (i.e. did not make it through the filter)
if field not in stix_obj.keys():
clean = False
break
# check filter "field" is in STIX object - if cant be
# applied due to STIX object, STIX object is discarded
# (i.e. did not make it through the filter)
if field not in stix_obj.keys():
clean = False
break
match = getattr(STIXCommonPropertyFilters, field)(filter, stix_obj)
if not match:
clean = False
break
elif match == -1:
raise Exception("Error, filter operator: {0} not supported for specified field: {1}".format(filter.op, filter.field))
except Exception as e:
raise ValueError(e)
match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj)
if not match:
clean = False
break
elif match == -1:
raise ValueError("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field))
# if object unmarked after all filters, add it
if clean:
@ -524,140 +484,3 @@ class CompositeDataSource(DataSource):
"""
return self.data_sources.values()
class STIXCommonPropertyFilters(object):
"""
"""
@classmethod
def _all(cls, filter, stix_obj_field):
"""all filter operations (for filters whose value type can be applied to any operation type)"""
if filter.op == "=":
return stix_obj_field == filter.value
elif filter.op == "!=":
return stix_obj_field != filter.value
elif filter.op == "in":
return stix_obj_field in filter.value
elif filter.op == ">":
return stix_obj_field > filter.value
elif filter.op == "<":
return stix_obj_field < filter.value
elif filter.op == ">=":
return stix_obj_field >= filter.value
elif filter.op == "<=":
return stix_obj_field <= filter.value
else:
return -1
@classmethod
def _id(cls, filter, stix_obj_id):
"""base filter types"""
if filter.op == "=":
return stix_obj_id == filter.value
elif filter.op == "!=":
return stix_obj_id != filter.value
else:
return -1
@classmethod
def _boolean(cls, filter, stix_obj_field):
if filter.op == "=":
return stix_obj_field == filter.value
elif filter.op == "!=":
return stix_obj_field != filter.value
else:
return -1
@classmethod
def _string(cls, filter, stix_obj_field):
return cls._all(filter, stix_obj_field)
@classmethod
def _timestamp(cls, filter, stix_obj_timestamp):
return cls._all(filter, stix_obj_timestamp)
# STIX 2.0 Common Property filters
@classmethod
def created(cls, filter, stix_obj):
return cls._timestamp(filter, stix_obj["created"])
@classmethod
def created_by_ref(cls, filter, stix_obj):
return cls._id(filter, stix_obj["created_by_ref"])
@classmethod
def external_references(cls, filter, stix_obj):
"""
STIX object's can have a list of external references
external_references properties:
external_references.source_name (string)
external_references.description (string)
external_references.url (string)
external_references.hashes (hash, but for filtering purposes, a string)
external_references.external_id (string)
"""
for er in stix_obj["external_references"]:
# grab er property name from filter field
filter_field = filter.field.split(".")[1]
r = cls._string(filter, er[filter_field])
if r:
return r
return False
@classmethod
def granular_markings(cls, filter, stix_obj):
"""
STIX object's can have a list of granular marking references
granular_markings properties:
granular_markings.marking_ref (id)
granular_markings.selectors (string)
"""
for gm in stix_obj["granular_markings"]:
# grab gm property name from filter field
filter_field = filter.field.split(".")[1]
if filter_field == "marking_ref":
return cls._id(filter, gm[filter_field])
elif filter_field == "selectors":
for selector in gm[filter_field]:
r = cls._string(filter, selector)
if r:
return r
return False
@classmethod
def id(cls, filter, stix_obj):
return cls._id(filter, stix_obj["id"])
@classmethod
def labels(cls, filter, stix_obj):
for label in stix_obj["labels"]:
r = cls._string(filter, label)
if r:
return r
return False
@classmethod
def modified(cls, filter, stix_obj):
return cls._timestamp(filter, stix_obj["modified"])
@classmethod
def object_marking_refs(cls, filter, stix_obj):
for marking_id in stix_obj["object_marking_refs"]:
r = cls._id(filter, marking_id)
if r:
return r
return False
@classmethod
def revoked(cls, filter, stix_obj):
return cls._boolean(filter, stix_obj["revoked"])
@classmethod
def type(cls, filter, stix_obj):
return cls._string(filter, stix_obj["type"])

View File

@ -13,7 +13,8 @@ import json
import os
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore, Filter
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter
class FileSystemStore(DataStore):
@ -89,8 +90,7 @@ class FileSystemSource(DataSource):
"""
Notes:
Since FileSystem sources/sinks don't handle multiple versions
of a STIX object, this operation is futile. Pass call to get().
(Approved by G.B.)
of a STIX object, this operation is unnecessary. Pass call to get().
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]

204
stix2/sources/filters.py Normal file
View File

@ -0,0 +1,204 @@
"""
Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
Classes:
Filter
TODO: The script at the bottom of the module works (to capture
all the callable filter methods), however it causes this module
to be imported by itself twice. Not sure how big of deal that is,
or if cleaner solution possible.
"""
import collections
import types
# Currently, only STIX 2.0 common SDO fields (that are not complex objects)
# are supported for filtering on
STIX_COMMON_FIELDS = [
"created",
"created_by_ref",
"external_references.source_name",
"external_references.description",
"external_references.url",
"external_references.hashes",
"external_references.external_id",
"granular_markings.marking_ref",
"granular_markings.selectors",
"id",
"labels",
"modified",
"object_marking_refs",
"revoked",
"type",
"granular_markings"
]
# Supported filter operations
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
# Supported filter value types
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
# filter lookup map - STIX 2 common fields -> filter method
STIX_COMMON_FILTERS_MAP = {}
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
__slots__ = ()
def __new__(cls, field, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
self = super(Filter, cls).__new__(cls, field, op, value)
return self
# primitive type filters
def _all_filter(filter_, stix_obj_field):
"""all filter operations (for filters whose value type can be applied to any operation type)"""
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
elif filter_.op == "in":
return stix_obj_field in filter_.value
elif filter_.op == ">":
return stix_obj_field > filter_.value
elif filter_.op == "<":
return stix_obj_field < filter_.value
elif filter_.op == ">=":
return stix_obj_field >= filter_.value
elif filter_.op == "<=":
return stix_obj_field <= filter_.value
else:
return -1
def _id_filter(filter_, stix_obj_id):
"""base filter types"""
if filter_.op == "=":
return stix_obj_id == filter_.value
elif filter_.op == "!=":
return stix_obj_id != filter_.value
else:
return -1
def _boolean_filter(filter_, stix_obj_field):
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
else:
return -1
def _string_filter(filter_, stix_obj_field):
return _all_filter(filter_, stix_obj_field)
def _timestamp_filter(filter_, stix_obj_timestamp):
return _all_filter(filter_, stix_obj_timestamp)
# STIX 2.0 Common Property filters
# The naming of these functions is important as
# they are used to index a mapping dictionary from
# STIX common field names to these filter functions.
#
# REQUIRED naming scheme:
# "check_<STIX field name>_filter"
def check_created_filter(filter_, stix_obj):
return _timestamp_filter(filter_, stix_obj["created"])
def check_created_by_ref_filter(filter_, stix_obj):
return _id_filter(filter_, stix_obj["created_by_ref"])
def check_external_references_filter(filter_, stix_obj):
"""
STIX object's can have a list of external references
external_references properties:
external_references.source_name (string)
external_references.description (string)
external_references.url (string)
external_references.hashes (hash, but for filtering purposes, a string)
external_references.external_id (string)
"""
for er in stix_obj["external_references"]:
# grab er property name from filter field
filter_field = filter_.field.split(".")[1]
r = _string_filter(filter_, er[filter_field])
if r:
return r
return False
def check_granular_markings_filter(filter_, stix_obj):
"""
STIX object's can have a list of granular marking references
granular_markings properties:
granular_markings.marking_ref (id)
granular_markings.selectors (string)
"""
for gm in stix_obj["granular_markings"]:
# grab gm property name from filter field
filter_field = filter_.field.split(".")[1]
if filter_field == "marking_ref":
return _id_filter(filter_, gm[filter_field])
elif filter_field == "selectors":
for selector in gm[filter_field]:
r = _string_filter(filter_, selector)
if r:
return r
return False
def check_id_filter(filter_, stix_obj):
return _id_filter(filter_, stix_obj["id"])
def check_labels_filter(filter_, stix_obj):
for label in stix_obj["labels"]:
r = _string_filter(filter_, label)
if r:
return r
return False
def check_modified_filter(filter_, stix_obj):
return _timestamp_filter(filter_, stix_obj["modified"])
def check_object_marking_refs_filter(filter_, stix_obj):
for marking_id in stix_obj["object_marking_refs"]:
r = _id_filter(filter_, marking_id)
if r:
return r
return False
def check_revoked_filter(filter_, stix_obj):
return _boolean_filter(filter_, stix_obj["revoked"])
def check_type_filter(filter_, stix_obj):
return _string_filter(filter_, stix_obj["type"])
# Create mapping of field names to filter functions
for name, obj in dict(globals()).items():
if "check_" in name and isinstance(obj, types.FunctionType):
field_name = "_".join(name.split("_")[1:-1])
STIX_COMMON_FILTERS_MAP[field_name] = obj

View File

@ -25,7 +25,8 @@ import os
from stix2validator import validate_instance
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore, Filter
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter
def _add(store, stix_data):
@ -155,19 +156,16 @@ class MemorySource(DataSource):
def all_versions(self, stix_id, _composite_filters=None):
"""
Notes:
Similar to get() except returns list of all object versions of
the specified "id".
Since Memory sources/sinks don't handle multiple versions of a
STIX object, this operation is unnecessary. Translate call to get().
Args:
stix_id (str): The id of the STIX 2.0 object to retrieve. Should
return a list of objects, all the versions of the object
specified by the "id".
_composite_filters (list): list of filters passed from the
Composite Data Source.
Returns:
stix_objs (list): STIX objects that matched ``stix_id``.
(list): STIX object that matched ``stix_id``.
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]

View File

@ -12,7 +12,8 @@ TODO: Test everything
import json
from stix2.sources import DataSink, DataSource, DataStore, Filter, make_id
from stix2.sources import DataSink, DataSource, DataStore, make_id
from stix2.sources.filters import Filter
TAXII_FILTERS = ['added_after', 'id', 'type', 'version']
@ -130,11 +131,7 @@ class TAXIICollectionSource(DataSource):
For instance - "?match[type]=indicator,sighting" should be in a
query dict as follows:
{
"field": "type"
"op": "=",
"value": "indicator,sighting"
}
Filter("type", "=", "indicator,sighting")
Args:
query (list): list of filters to extract which ones are TAXII

View File

@ -118,6 +118,20 @@ def test_create_bundle_with_arg_listarg_and_kwarg(indicator, malware, relationsh
assert str(bundle) == EXPECTED_BUNDLE
def test_create_bundle_invalid(indicator, malware, relationship):
with pytest.raises(ValueError) as excinfo:
stix2.Bundle(objects=[1])
assert excinfo.value.reason == "This property may only contain a dictionary or object"
with pytest.raises(ValueError) as excinfo:
stix2.Bundle(objects=[{}])
assert excinfo.value.reason == "This property may only contain a non-empty dictionary or object"
with pytest.raises(ValueError) as excinfo:
stix2.Bundle(objects=[{'type': 'bundle'}])
assert excinfo.value.reason == 'This property may not contain a Bundle object'
def test_parse_bundle():
bundle = stix2.parse(EXPECTED_BUNDLE)

View File

@ -6,7 +6,7 @@ from .constants import FAKE_TIME
def test_identity_custom_property():
with pytest.raises(ValueError):
with pytest.raises(ValueError) as excinfo:
stix2.Identity(
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
created="2015-12-21T19:59:11Z",
@ -15,6 +15,7 @@ def test_identity_custom_property():
identity_class="individual",
custom_properties="foobar",
)
assert str(excinfo.value) == "'custom_properties' must be a dictionary"
identity = stix2.Identity(
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
@ -31,7 +32,7 @@ def test_identity_custom_property():
def test_identity_custom_property_invalid():
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
stix2.Identity(
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
created="2015-12-21T19:59:11Z",
@ -40,6 +41,9 @@ def test_identity_custom_property_invalid():
identity_class="individual",
x_foo="bar",
)
assert excinfo.value.cls == stix2.Identity
assert excinfo.value.properties == ['x_foo']
assert "Unexpected properties for" in str(excinfo.value)
def test_identity_custom_property_allowed():
@ -67,8 +71,11 @@ def test_identity_custom_property_allowed():
}""",
])
def test_parse_identity_custom_property(data):
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
identity = stix2.parse(data)
assert excinfo.value.cls == stix2.Identity
assert excinfo.value.properties == ['foo']
assert "Unexpected properties for" in str(excinfo.value)
identity = stix2.parse(data, allow_custom=True)
assert identity.foo == "bar"
@ -88,11 +95,13 @@ def test_custom_object_type():
nt = NewType(property1='something')
assert nt.property1 == 'something'
with pytest.raises(stix2.exceptions.MissingPropertiesError):
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
NewType(property2=42)
assert "No values for required properties" in str(excinfo.value)
with pytest.raises(ValueError):
with pytest.raises(ValueError) as excinfo:
NewType(property1='something', property2=4)
assert "'property2' is too small." in str(excinfo.value)
def test_parse_custom_object_type():
@ -106,12 +115,25 @@ def test_parse_custom_object_type():
assert nt.property1 == 'something'
def test_parse_unregistered_custom_object_type():
nt_string = """{
"type": "x-foobar-observable",
"created": "2015-12-21T19:59:11Z",
"property1": "something"
}"""
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse(nt_string)
assert "Can't parse unknown object type" in str(excinfo.value)
assert "use the CustomObject decorator." in str(excinfo.value)
@stix2.observables.CustomObservable('x-new-observable', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
('x_property3', stix2.properties.BooleanProperty()),
])
class NewObservable(object):
class NewObservable():
def __init__(self, property2=None, **kwargs):
if property2 and property2 < 10:
raise ValueError("'property2' is too small.")
@ -121,11 +143,59 @@ def test_custom_observable_object():
no = NewObservable(property1='something')
assert no.property1 == 'something'
with pytest.raises(stix2.exceptions.MissingPropertiesError):
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
NewObservable(property2=42)
assert excinfo.value.properties == ['property1']
assert "No values for required properties" in str(excinfo.value)
with pytest.raises(ValueError):
with pytest.raises(ValueError) as excinfo:
NewObservable(property1='something', property2=4)
assert "'property2' is too small." in str(excinfo.value)
def test_custom_observable_object_invalid_ref_property():
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomObservable('x-new-obs', [
('property_ref', stix2.properties.StringProperty()),
])
class NewObs():
pass
assert "is named like an object reference property but is not an ObjectReferenceProperty" in str(excinfo.value)
def test_custom_observable_object_invalid_refs_property():
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomObservable('x-new-obs', [
('property_refs', stix2.properties.StringProperty()),
])
class NewObs():
pass
assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value)
def test_custom_observable_object_invalid_refs_list_property():
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomObservable('x-new-obs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.StringProperty)),
])
class NewObs():
pass
assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value)
def test_custom_observable_object_invalid_valid_refs():
@stix2.observables.CustomObservable('x-new-obs', [
('property1', stix2.properties.StringProperty(required=True)),
('property_ref', stix2.properties.ObjectReferenceProperty(valid_types='email-addr')),
])
class NewObs():
pass
with pytest.raises(Exception) as excinfo:
NewObs(_valid_refs=['1'],
property1='something',
property_ref='1')
assert "must be created with _valid_refs as a dict, not a list" in str(excinfo.value)
def test_custom_no_properties_raises_exception():
@ -154,12 +224,34 @@ def test_parse_custom_observable_object():
assert nt.property1 == 'something'
def test_parse_unregistered_custom_observable_object():
nt_string = """{
"type": "x-foobar-observable",
"property1": "something"
}"""
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse_observable(nt_string)
assert "Can't parse unknown observable type" in str(excinfo.value)
def test_parse_invalid_custom_observable_object():
nt_string = """{
"property1": "something"
}"""
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse_observable(nt_string)
assert "Can't parse observable with no 'type' property" in str(excinfo.value)
def test_observable_custom_property():
with pytest.raises(ValueError):
with pytest.raises(ValueError) as excinfo:
NewObservable(
property1='something',
custom_properties="foobar",
)
assert "'custom_properties' must be a dictionary" in str(excinfo.value)
no = NewObservable(
property1='something',
@ -171,11 +263,13 @@ def test_observable_custom_property():
def test_observable_custom_property_invalid():
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
NewObservable(
property1='something',
x_foo="bar",
)
assert excinfo.value.properties == ['x_foo']
assert "Unexpected properties for" in str(excinfo.value)
def test_observable_custom_property_allowed():
@ -197,3 +291,107 @@ def test_observed_data_with_custom_observable_object():
allow_custom=True,
)
assert ob_data.objects['0'].property1 == 'something'
@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
'property2': stix2.properties.IntegerProperty(),
})
class NewExtension():
def __init__(self, property2=None, **kwargs):
if property2 and property2 < 10:
raise ValueError("'property2' is too small.")
def test_custom_extension():
ext = NewExtension(property1='something')
assert ext.property1 == 'something'
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
NewExtension(property2=42)
assert excinfo.value.properties == ['property1']
assert str(excinfo.value) == "No values for required properties for _Custom: (property1)."
with pytest.raises(ValueError) as excinfo:
NewExtension(property1='something', property2=4)
assert str(excinfo.value) == "'property2' is too small."
def test_custom_extension_wrong_observable_type():
ext = NewExtension(property1='something')
with pytest.raises(ValueError) as excinfo:
stix2.File(name="abc.txt",
extensions={
"ntfs-ext": ext,
})
assert 'Cannot determine extension type' in excinfo.value.reason
def test_custom_extension_invalid_observable():
# These extensions are being applied to improperly-created Observables.
# The Observable classes should have been created with the CustomObservable decorator.
class Foo(object):
pass
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomExtension(Foo, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
})
class FooExtension():
pass # pragma: no cover
assert str(excinfo.value) == "'observable' must be a valid Observable class!"
class Bar(stix2.observables._Observable):
pass
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomExtension(Bar, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
})
class BarExtension():
pass
assert "Unknown observable type" in str(excinfo.value)
assert "Custom observables must be created with the @CustomObservable decorator." in str(excinfo.value)
class Baz(stix2.observables._Observable):
_type = 'Baz'
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomExtension(Baz, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
})
class BazExtension():
pass
assert "Unknown observable type" in str(excinfo.value)
assert "Custom observables must be created with the @CustomObservable decorator." in str(excinfo.value)
def test_parse_observable_with_custom_extension():
input_str = """{
"type": "domain-name",
"value": "example.com",
"extensions": {
"x-new-ext": {
"property1": "foo",
"property2": 12
}
}
}"""
parsed = stix2.parse_observable(input_str)
assert parsed.extensions['x-new-ext'].property2 == 12
def test_parse_observable_with_unregistered_custom_extension():
input_str = """{
"type": "domain-name",
"value": "example.com",
"extensions": {
"x-foobar-ext": {
"property1": "foo",
"property2": 12
}
}
}"""
with pytest.raises(ValueError) as excinfo:
stix2.parse_observable(input_str)
assert "Can't parse Unknown extension type" in str(excinfo.value)

View File

@ -1,9 +1,9 @@
import pytest
from taxii2client import Collection
import stix2
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, Filter, make_id, taxii)
DataStore, make_id, taxii)
from stix2.sources.filters import Filter
from stix2.sources.memory import MemorySource, MemoryStore
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
@ -19,107 +19,110 @@ def collection():
return Collection(COLLECTION_URL, MockTAXIIClient())
STIX_OBJS1 = [
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.936Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
]
@pytest.fixture
def ds():
return DataSource()
STIX_OBJS2 = [
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-31T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
]
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND2 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND3 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.936Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND4 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND5 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND6 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-31T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND7 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND8 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
STIX_OBJS2 = [IND6, IND7, IND8]
STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
def test_ds_abstract_class_smoke():
@ -195,7 +198,7 @@ def test_parse_taxii_filters():
assert taxii_filters == expected_params
def test_add_get_remove_filter():
def test_add_get_remove_filter(ds):
# First 3 filters are valid, remaining fields are erroneous in some way
valid_filters = [
@ -209,8 +212,6 @@ def test_add_get_remove_filter():
Filter('created', '=', object()),
]
ds = DataSource()
assert len(ds.filters) == 0
ds.add_filter(valid_filters[0])
@ -248,7 +249,7 @@ def test_add_get_remove_filter():
ds.add_filters(valid_filters)
def test_apply_common_filters():
def test_apply_common_filters(ds):
stix_objs = [
{
"created": "2017-01-27T13:49:53.997Z",
@ -330,8 +331,6 @@ def test_apply_common_filters():
Filter("external_references.source_name", "=", "CVE"),
]
ds = DataSource()
# "Return any object whose type is not relationship"
resp = ds.apply_common_filters(stix_objs, [filters[0]])
ids = [r['id'] for r in resp]
@ -406,72 +405,72 @@ def test_apply_common_filters():
resp = ds.apply_common_filters(stix_objs, [filters[14]])
assert len(resp) == 0
# These filters are used with STIX_OBJS2 object collection.
more_filters = [
Filter("modified", "<", "2017-01-28T13:49:53.935Z"),
Filter("modified", ">", "2017-01-28T13:49:53.935Z"),
Filter("modified", ">=", "2017-01-27T13:49:53.935Z"),
Filter("modified", "<=", "2017-01-27T13:49:53.935Z"),
Filter("modified", "?", "2017-01-27T13:49:53.935Z"),
Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"),
Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"),
Filter("notacommonproperty", "=", "bar"),
]
def test_filters0(ds):
# "Return any object modified before 2017-01-28T13:49:53.935Z"
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[0]])
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters1(ds):
# "Return any object modified after 2017-01-28T13:49:53.935Z"
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[1]])
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters2(ds):
# "Return any object modified after or on 2017-01-28T13:49:53.935Z"
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[2]])
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
def test_filters3(ds):
# "Return any object modified before or on 2017-01-28T13:49:53.935Z"
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[3]])
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters4(ds):
fltr4 = Filter("modified", "?", "2017-01-27T13:49:53.935Z")
# Assert unknown operator for _all() raises exception.
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [more_filters[4]])
ds.apply_common_filters(STIX_OBJS2, [fltr4])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}"
.format(more_filters[4].op,
more_filters[4].field))
"for specified field: {1}").format(fltr4.op, fltr4.field)
def test_filters5(ds):
# "Return any object whose id is not indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[5]])
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters6(ds):
fltr6 = Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
# Assert unknown operator for _id() raises exception.
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [more_filters[6]])
ds.apply_common_filters(STIX_OBJS2, [fltr6])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}"
.format(more_filters[6].op,
more_filters[6].field))
"for specified field: {1}").format(fltr6.op, fltr6.field)
def test_filters7(ds):
fltr7 = Filter("notacommonproperty", "=", "bar")
# Assert unknown field raises exception.
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [more_filters[7]])
ds.apply_common_filters(STIX_OBJS2, [fltr7])
assert str(excinfo.value) == ("Error, field: {0} is not supported for "
"filtering on."
.format(more_filters[7].field))
"filtering on.").format(fltr7.field)
def test_deduplicate():
ds = DataSource()
def test_deduplicate(ds):
unique = ds.deduplicate(STIX_OBJS1)
# Only 3 objects are unique

View File

@ -127,6 +127,42 @@ def test_observed_data_example_with_bad_refs():
assert excinfo.value.reason == "Invalid object reference for 'Directory:contains_refs': '2' is not a valid object in local scope"
def test_observed_data_example_with_non_dictionary():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.ObservedData(
id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T19:58:16.000Z",
modified="2016-04-06T19:58:16.000Z",
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=50,
objects="file: foo.exe",
)
assert excinfo.value.cls == stix2.ObservedData
assert excinfo.value.prop_name == "objects"
assert 'must contain a dictionary' in excinfo.value.reason
def test_observed_data_example_with_empty_dictionary():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.ObservedData(
id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T19:58:16.000Z",
modified="2016-04-06T19:58:16.000Z",
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=50,
objects={},
)
assert excinfo.value.cls == stix2.ObservedData
assert excinfo.value.prop_name == "objects"
assert 'must contain a non-empty dictionary' in excinfo.value.reason
@pytest.mark.parametrize("data", [
EXPECTED,
{
@ -419,6 +455,8 @@ def test_parse_email_message_with_at_least_one_error(data):
assert excinfo.value.cls == stix2.EmailMIMEComponent
assert excinfo.value.properties == ["body", "body_raw_ref"]
assert "At least one of the" in str(excinfo.value)
assert "must be populated" in str(excinfo.value)
@pytest.mark.parametrize("data", [
@ -558,7 +596,7 @@ def test_artifact_mutual_exclusion_error():
assert excinfo.value.cls == stix2.Artifact
assert excinfo.value.properties == ["payload_bin", "url"]
assert str(excinfo.value) == "The (payload_bin, url) properties for Artifact are mutually exclusive."
assert 'are mutually exclusive' in str(excinfo.value)
def test_directory_example():
@ -804,6 +842,8 @@ def test_file_example_encryption_error():
assert excinfo.value.cls == stix2.File
assert excinfo.value.dependencies == [("is_encrypted", "encryption_algorithm")]
assert "property dependencies" in str(excinfo.value)
assert "are not met" in str(excinfo.value)
with pytest.raises(stix2.exceptions.DependentPropertiesError) as excinfo:
stix2.File(name="qwerty.dll",

View File

@ -5,10 +5,10 @@ from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError
from stix2.observables import EmailMIMEComponent, ExtensionsProperty
from stix2.properties import (BinaryProperty, BooleanProperty,
DictionaryProperty, EmbeddedObjectProperty,
EnumProperty, HashesProperty, HexProperty,
IDProperty, IntegerProperty, ListProperty,
Property, ReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
EnumProperty, FloatProperty, HashesProperty,
HexProperty, IDProperty, IntegerProperty,
ListProperty, Property, ReferenceProperty,
StringProperty, TimestampProperty, TypeProperty)
from .constants import FAKE_TIME
@ -119,6 +119,27 @@ def test_integer_property_invalid(value):
int_prop.clean(value)
@pytest.mark.parametrize("value", [
2,
-1,
3.14,
False,
])
def test_float_property_valid(value):
int_prop = FloatProperty()
assert int_prop.clean(value) is not None
@pytest.mark.parametrize("value", [
"something",
StringProperty(),
])
def test_float_property_invalid(value):
int_prop = FloatProperty()
with pytest.raises(ValueError):
int_prop.clean(value)
@pytest.mark.parametrize("value", [
True,
False,
@ -215,7 +236,7 @@ def test_dictionary_property_valid(d):
[{'Hey!': 'something'}, "Invalid dictionary key Hey!: (contains characters other thanlowercase a-z, "
"uppercase A-Z, numerals 0-9, hyphen (-), or underscore (_))."],
])
def test_dictionary_property_invalid(d):
def test_dictionary_property_invalid_key(d):
dict_prop = DictionaryProperty()
with pytest.raises(DictionaryKeyError) as excinfo:
@ -224,6 +245,26 @@ def test_dictionary_property_invalid(d):
assert str(excinfo.value) == d[1]
@pytest.mark.parametrize("d", [
({}, "The dictionary property must contain a non-empty dictionary"),
# TODO: This error message could be made more helpful. The error is caused
# because `json.loads()` doesn't like the *single* quotes around the key
# name, even though they are valid in a Python dictionary. While technically
# accurate (a string is not a dictionary), if we want to be able to load
# string-encoded "dictionaries" that are, we need a better error message
# or an alternative to `json.loads()` ... and preferably *not* `eval()`. :-)
# Changing the following to `'{"description": "something"}'` does not cause
# any ValueError to be raised.
("{'description': 'something'}", "The dictionary property must contain a dictionary"),
])
def test_dictionary_property_invalid(d):
dict_prop = DictionaryProperty()
with pytest.raises(ValueError) as excinfo:
dict_prop.clean(d[0])
assert str(excinfo.value) == d[1]
@pytest.mark.parametrize("value", [
{"sha256": "6db12788c37247f2316052e142f42f4b259d6561751e5f401a1ae2a6df9c674b"},
[('MD5', '2dfb1bcc980200c6706feee399d41b3f'), ('RIPEMD-160', 'b3a8cd8a27c90af79b3c81754f267780f443dfef')],
@ -257,10 +298,18 @@ def test_embedded_property():
emb_prop.clean("string")
def test_enum_property():
enum_prop = EnumProperty(['a', 'b', 'c'])
@pytest.mark.parametrize("value", [
['a', 'b', 'c'],
('a', 'b', 'c'),
'b',
])
def test_enum_property_valid(value):
enum_prop = EnumProperty(value)
assert enum_prop.clean('b')
def test_enum_property_invalid():
enum_prop = EnumProperty(['a', 'b', 'c'])
with pytest.raises(ValueError):
enum_prop.clean('z')

View File

@ -169,6 +169,7 @@ def test_versioning_error_new_version_of_revoked():
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
campaign_v2.new_version(name="barney")
assert str(excinfo.value) == "Cannot create a new version of a revoked object."
assert excinfo.value.called_by == "new_version"
assert str(excinfo.value) == "Cannot create a new version of a revoked object."
@ -188,6 +189,7 @@ def test_versioning_error_revoke_of_revoked():
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
campaign_v2.revoke()
assert str(excinfo.value) == "Cannot revoke an already revoked object."
assert excinfo.value.called_by == "revoke"
assert str(excinfo.value) == "Cannot revoke an already revoked object."