From 94ccd422c9ae98f5210ceaf063e426ee80f53e34 Mon Sep 17 00:00:00 2001 From: clenk Date: Wed, 23 Aug 2017 13:50:31 -0400 Subject: [PATCH 1/9] Make isort recognize stix2patterns --- .isort.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.isort.cfg b/.isort.cfg index 63f5b73..badf815 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -2,6 +2,6 @@ check=1 diff=1 known_third_party=dateutil,pytest,pytz,six,requests -known_first_party=stix2 +known_first_party=stix2,stix2patterns not_skip=__init__.py force_sort_within_sections=1 From ee49e78c72163e40c8cdae0b43914434e47d24ca Mon Sep 17 00:00:00 2001 From: clenk Date: Wed, 23 Aug 2017 18:36:24 -0400 Subject: [PATCH 2/9] Add custom extensions to cyber observables Fix #31. --- stix2/observables.py | 124 ++++++++++++++++++++++++++++---------- stix2/test/test_custom.py | 57 ++++++++++++++++++ 2 files changed, 150 insertions(+), 31 deletions(-) diff --git a/stix2/observables.py b/stix2/observables.py index 366e007..3c033bd 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -24,7 +24,7 @@ class ObservableProperty(Property): except ValueError: raise ValueError("The observable property must contain a dictionary") if dictified == {}: - raise ValueError("The dictionary property must contain a non-empty dictionary") + raise ValueError("The observable property must contain a non-empty dictionary") valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) @@ -53,7 +53,7 @@ class ExtensionsProperty(DictionaryProperty): except ValueError: raise ValueError("The extensions property must contain a dictionary") if dictified == {}: - raise ValueError("The dictionary property must contain a non-empty dictionary") + raise ValueError("The extensions property must contain a non-empty dictionary") if self.enclosing_type in EXT_MAP: specific_type_map = EXT_MAP[self.enclosing_type] @@ -69,7 +69,7 @@ class ExtensionsProperty(DictionaryProperty): else: raise ValueError("The key used in the extensions dictionary is not an extension type name") else: - raise ValueError("The enclosing type has no extensions defined") + raise ValueError("The enclosing type '%s' has no extensions defined" % self.enclosing_type) return dictified @@ -77,6 +77,7 @@ class Artifact(_Observable): _type = 'artifact' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'mime_type': StringProperty(), 'payload_bin': BinaryProperty(), 'url': StringProperty(), @@ -93,6 +94,7 @@ class AutonomousSystem(_Observable): _type = 'autonomous-system' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'number': IntegerProperty(), 'name': StringProperty(), 'rir': StringProperty(), @@ -103,6 +105,7 @@ class Directory(_Observable): _type = 'directory' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'path': StringProperty(required=True), 'path_enc': StringProperty(), # these are not the created/modified timestamps of the object itself @@ -117,6 +120,7 @@ class DomainName(_Observable): _type = 'domain-name' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name'])), } @@ -126,6 +130,7 @@ class EmailAddress(_Observable): _type = 'email-addr' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'display_name': StringProperty(), 'belongs_to_ref': ObjectReferenceProperty(valid_types='user-account'), @@ -149,6 +154,7 @@ class EmailMessage(_Observable): _type = 'email-message' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'is_multipart': BooleanProperty(required=True), 'date': TimestampProperty(), 'content_type': StringProperty(), @@ -174,6 +180,7 @@ class EmailMessage(_Observable): class ArchiveExt(_Extension): + _type = 'archive-ext' _properties = { 'contains_refs': ListProperty(ObjectReferenceProperty(valid_types='file'), required=True), 'version': StringProperty(), @@ -190,6 +197,7 @@ class AlternateDataStream(_STIXBase): class NTFSExt(_Extension): + _type = 'ntfs-ext' _properties = { 'sid': StringProperty(), 'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)), @@ -197,6 +205,7 @@ class NTFSExt(_Extension): class PDFExt(_Extension): + _type = 'pdf-ext' _properties = { 'version': StringProperty(), 'is_optimized': BooleanProperty(), @@ -207,6 +216,7 @@ class PDFExt(_Extension): class RasterImageExt(_Extension): + _type = 'raster-image-ext' _properties = { 'image_height': IntegerProperty(), 'image_weight': IntegerProperty(), @@ -266,6 +276,7 @@ class WindowsPESection(_STIXBase): class WindowsPEBinaryExt(_Extension): + _type = 'windows-pebinary-ext' _properties = { 'pe_type': StringProperty(required=True), # open_vocab 'imphash': StringProperty(), @@ -315,6 +326,7 @@ class IPv4Address(_Observable): _type = 'ipv4-addr' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), 'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), @@ -325,6 +337,7 @@ class IPv6Address(_Observable): _type = 'ipv6-addr' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), 'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), @@ -335,6 +348,7 @@ class MACAddress(_Observable): _type = 'mac-addr' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), } @@ -343,11 +357,13 @@ class Mutex(_Observable): _type = 'mutex' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'name': StringProperty(), } class HTTPRequestExt(_Extension): + _type = 'http-request-ext' _properties = { 'request_method': StringProperty(required=True), 'request_value': StringProperty(required=True), @@ -359,6 +375,7 @@ class HTTPRequestExt(_Extension): class ICMPExt(_Extension): + _type = 'icmp-ext' _properties = { 'icmp_type_hex': HexProperty(required=True), 'icmp_code_hex': HexProperty(required=True), @@ -366,6 +383,7 @@ class ICMPExt(_Extension): class SocketExt(_Extension): + _type = 'socket-ext' _properties = { 'address_family': EnumProperty([ "AF_UNSPEC", @@ -399,6 +417,7 @@ class SocketExt(_Extension): class TCPExt(_Extension): + _type = 'tcp-ext' _properties = { 'src_flags_hex': HexProperty(), 'dst_flags_hex': HexProperty(), @@ -435,6 +454,7 @@ class NetworkTraffic(_Observable): class WindowsProcessExt(_Extension): + _type = 'windows-process-ext' _properties = { 'aslr_enabled': BooleanProperty(), 'dep_enabled': BooleanProperty(), @@ -446,6 +466,7 @@ class WindowsProcessExt(_Extension): class WindowsServiceExt(_Extension): + _type = 'windows-service-ext' _properties = { 'service_name': StringProperty(required=True), 'descriptions': ListProperty(StringProperty), @@ -517,6 +538,7 @@ class Software(_Observable): _type = 'software' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'name': StringProperty(required=True), 'cpe': StringProperty(), 'languages': ListProperty(StringProperty), @@ -529,11 +551,13 @@ class URL(_Observable): _type = 'url' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'value': StringProperty(required=True), } class UNIXAccountExt(_Extension): + _type = 'unix-account-ext' _properties = { 'gid': IntegerProperty(), 'groups': ListProperty(StringProperty), @@ -590,6 +614,7 @@ class WindowsRegistryKey(_Observable): _type = 'windows-registry-key' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'key': StringProperty(required=True), 'values': ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType)), # this is not the modified timestamps of the object itself @@ -630,6 +655,7 @@ class X509Certificate(_Observable): _type = 'x509-certificate' _properties = { 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), 'is_self_signed': BooleanProperty(), 'hashes': HashesProperty(), 'version': StringProperty(), @@ -667,36 +693,28 @@ OBJ_MAP_OBSERVABLE = { 'x509-certificate': X509Certificate, } -EXT_MAP_FILE = { - 'archive-ext': ArchiveExt, - 'ntfs-ext': NTFSExt, - 'pdf-ext': PDFExt, - 'raster-image-ext': RasterImageExt, - 'windows-pebinary-ext': WindowsPEBinaryExt -} - -EXT_MAP_NETWORK_TRAFFIC = { - 'http-request-ext': HTTPRequestExt, - 'icmp-ext': ICMPExt, - 'socket-ext': SocketExt, - 'tcp-ext': TCPExt, -} - -EXT_MAP_PROCESS = { - 'windows-process-ext': WindowsProcessExt, - 'windows-service-ext': WindowsServiceExt, -} - -EXT_MAP_USER_ACCOUNT = { - 'unix-account-ext': UNIXAccountExt, -} EXT_MAP = { - 'file': EXT_MAP_FILE, - 'network-traffic': EXT_MAP_NETWORK_TRAFFIC, - 'process': EXT_MAP_PROCESS, - 'user-account': EXT_MAP_USER_ACCOUNT, - + 'file': { + 'archive-ext': ArchiveExt, + 'ntfs-ext': NTFSExt, + 'pdf-ext': PDFExt, + 'raster-image-ext': RasterImageExt, + 'windows-pebinary-ext': WindowsPEBinaryExt + }, + 'network-traffic': { + 'http-request-ext': HTTPRequestExt, + 'icmp-ext': ICMPExt, + 'socket-ext': SocketExt, + 'tcp-ext': TCPExt, + }, + 'process': { + 'windows-process-ext': WindowsProcessExt, + 'windows-service-ext': WindowsServiceExt, + }, + 'user-account': { + 'unix-account-ext': UNIXAccountExt, + }, } @@ -760,3 +778,47 @@ def CustomObservable(type='x-custom-observable', properties={}): return _Custom return custom_builder + + +def _register_extension(observable, new_extension): + """Register a custom extension to a STIX Cyber Observable type. + """ + + try: + observable_type = observable._type + except AttributeError: + raise ValueError("Custom observables must be created with the @CustomObservable decorator.") + + try: + EXT_MAP[observable_type][new_extension._type] = new_extension + except KeyError: + if observable_type not in OBJ_MAP_OBSERVABLE: + raise ValueError("Unknown observable type '%s'" % observable_type) + else: + EXT_MAP[observable_type] = {new_extension._type: new_extension} + + +def CustomExtension(observable=None, type='x-custom-observable', properties={}): + """Decorator for custom extensions to STIX Cyber Observables + """ + + if not observable or not issubclass(observable, _Observable): + raise ValueError("'observable' must be a valid Observable class!") + + def custom_builder(cls): + + class _Custom(cls, _Extension): + _type = type + _properties = { + 'extensions': ExtensionsProperty(enclosing_type=_type), + } + _properties.update(properties) + + def __init__(self, **kwargs): + _Extension.__init__(self, **kwargs) + cls.__init__(self, **kwargs) + + _register_extension(observable, _Custom) + return _Custom + + return custom_builder diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index 60e982c..804776a 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -180,3 +180,60 @@ def test_observed_data_with_custom_observable_object(): allow_custom=True, ) assert ob_data.objects['0'].property1 == 'something' + + +@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + 'property2': stix2.properties.IntegerProperty(), +}) +class NewExtension(): + def __init__(self, property2=None, **kwargs): + if property2 and property2 < 10: + raise ValueError("'property2' is too small.") + + +def test_custom_extension(): + ext = NewExtension(property1='something') + assert ext.property1 == 'something' + + with pytest.raises(stix2.exceptions.MissingPropertiesError): + NewExtension(property2=42) + + with pytest.raises(ValueError): + NewExtension(property1='something', property2=4) + + +def test_custom_extension_invalid(): + class Foo(object): + pass + with pytest.raises(ValueError): + @stix2.observables.CustomExtension(Foo, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + }) + class FooExtension(): + pass # pragma: no cover + + class Bar(stix2.observables._Observable): + pass + with pytest.raises(ValueError): + @stix2.observables.CustomExtension(Bar, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + }) + class BarExtension(): + pass + + +def test_parse_observable_with_custom_extension(): + input_str = """{ + "type": "domain-name", + "value": "example.com", + "extensions": { + "x-new-ext": { + "property1": "foo", + "property2": 12 + } + } + }""" + + parsed = stix2.parse_observable(input_str) + assert parsed.extensions['x-new-ext'].property2 == 12 From 6fa3c84aa3073de13fb64ab02b324fae2837ec43 Mon Sep 17 00:00:00 2001 From: clenk Date: Thu, 24 Aug 2017 15:46:36 -0400 Subject: [PATCH 3/9] Improve tests and coverage for custom content In ObservableProperty.clean(), parse_observable() will handle the issue of non-cyber observable objects being presnt in an observable property. The line raising a ValueError would not be reached so it was removed. --- stix2/observables.py | 17 +++--- stix2/test/test_custom.py | 111 +++++++++++++++++++++++++++++++++----- 2 files changed, 106 insertions(+), 22 deletions(-) diff --git a/stix2/observables.py b/stix2/observables.py index 3c033bd..50eb6d1 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -31,9 +31,6 @@ class ObservableProperty(Property): # from .__init__ import parse_observable # avoid circular import for key, obj in dictified.items(): parsed_obj = parse_observable(obj, valid_refs) - if not issubclass(type(parsed_obj), _Observable): - raise ValueError("Objects in an observable property must be " - "Cyber Observable Objects") dictified[key] = parsed_obj return dictified @@ -734,16 +731,17 @@ def parse_observable(data, _valid_refs=[], allow_custom=False): obj['_valid_refs'] = _valid_refs if 'type' not in obj: - raise ParseError("Can't parse object with no 'type' property: %s" % str(obj)) + raise ParseError("Can't parse observable with no 'type' property: %s" % str(obj)) try: obj_class = OBJ_MAP_OBSERVABLE[obj['type']] except KeyError: - raise ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type']) + raise ParseError("Can't parse unknown observable type '%s'! For custom observables, " + "use the CustomObservable decorator." % obj['type']) if 'extensions' in obj and obj['type'] in EXT_MAP: for name, ext in obj['extensions'].items(): if name not in EXT_MAP[obj['type']]: - raise ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type'])) + raise ParseError("Can't parse Unknown extension type '%s' for observable type '%s'!" % (name, obj['type'])) ext_class = EXT_MAP[obj['type']][name] obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) @@ -787,13 +785,16 @@ def _register_extension(observable, new_extension): try: observable_type = observable._type except AttributeError: - raise ValueError("Custom observables must be created with the @CustomObservable decorator.") + raise ValueError("Unknown observable type. Custom observables must be " + "created with the @CustomObservable decorator.") try: EXT_MAP[observable_type][new_extension._type] = new_extension except KeyError: if observable_type not in OBJ_MAP_OBSERVABLE: - raise ValueError("Unknown observable type '%s'" % observable_type) + raise ValueError("Unknown observable type '%s'. Custom observables " + "must be created with the @CustomObservable decorator." + % observable_type) else: EXT_MAP[observable_type] = {new_extension._type: new_extension} diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index 804776a..33d9287 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -6,7 +6,7 @@ from .constants import FAKE_TIME def test_identity_custom_property(): - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: stix2.Identity( id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", created="2015-12-21T19:59:11Z", @@ -15,6 +15,7 @@ def test_identity_custom_property(): identity_class="individual", custom_properties="foobar", ) + assert str(excinfo.value) == "'custom_properties' must be a dictionary" identity = stix2.Identity( id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", @@ -31,7 +32,7 @@ def test_identity_custom_property(): def test_identity_custom_property_invalid(): - with pytest.raises(stix2.exceptions.ExtraPropertiesError): + with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo: stix2.Identity( id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c", created="2015-12-21T19:59:11Z", @@ -40,6 +41,9 @@ def test_identity_custom_property_invalid(): identity_class="individual", x_foo="bar", ) + assert excinfo.value.cls == stix2.Identity + assert excinfo.value.properties == ['x_foo'] + assert "Unexpected properties for" in str(excinfo.value) def test_identity_custom_property_allowed(): @@ -67,8 +71,11 @@ def test_identity_custom_property_allowed(): }""", ]) def test_parse_identity_custom_property(data): - with pytest.raises(stix2.exceptions.ExtraPropertiesError): + with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo: identity = stix2.parse(data) + assert excinfo.value.cls == stix2.Identity + assert excinfo.value.properties == ['foo'] + assert "Unexpected properties for" in str(excinfo.value) identity = stix2.parse(data, allow_custom=True) assert identity.foo == "bar" @@ -88,11 +95,13 @@ def test_custom_object_type(): nt = NewType(property1='something') assert nt.property1 == 'something' - with pytest.raises(stix2.exceptions.MissingPropertiesError): + with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo: NewType(property2=42) + assert "No values for required properties" in str(excinfo.value) - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: NewType(property1='something', property2=4) + assert "'property2' is too small." in str(excinfo.value) def test_parse_custom_object_type(): @@ -120,11 +129,14 @@ def test_custom_observable_object(): no = NewObservable(property1='something') assert no.property1 == 'something' - with pytest.raises(stix2.exceptions.MissingPropertiesError): + with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo: NewObservable(property2=42) + assert excinfo.value.properties == ['property1'] + assert "No values for required properties" in str(excinfo.value) - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: NewObservable(property1='something', property2=4) + assert "'property2' is too small." in str(excinfo.value) def test_parse_custom_observable_object(): @@ -137,12 +149,34 @@ def test_parse_custom_observable_object(): assert nt.property1 == 'something' +def test_parse_unregistered_custom_observable_object(): + nt_string = """{ + "type": "x-foobar-observable", + "property1": "something" + }""" + + with pytest.raises(stix2.exceptions.ParseError) as excinfo: + stix2.parse_observable(nt_string) + assert "Can't parse unknown observable type" in str(excinfo.value) + + +def test_parse_invalid_custom_observable_object(): + nt_string = """{ + "property1": "something" + }""" + + with pytest.raises(stix2.exceptions.ParseError) as excinfo: + stix2.parse_observable(nt_string) + assert "Can't parse observable with no 'type' property" in str(excinfo.value) + + def test_observable_custom_property(): - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: NewObservable( property1='something', custom_properties="foobar", ) + assert "'custom_properties' must be a dictionary" in str(excinfo.value) no = NewObservable( property1='something', @@ -154,11 +188,13 @@ def test_observable_custom_property(): def test_observable_custom_property_invalid(): - with pytest.raises(stix2.exceptions.ExtraPropertiesError): + with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo: NewObservable( property1='something', x_foo="bar", ) + assert excinfo.value.properties == ['x_foo'] + assert "Unexpected properties for" in str(excinfo.value) def test_observable_custom_property_allowed(): @@ -196,31 +232,61 @@ def test_custom_extension(): ext = NewExtension(property1='something') assert ext.property1 == 'something' - with pytest.raises(stix2.exceptions.MissingPropertiesError): + with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo: NewExtension(property2=42) + assert excinfo.value.properties == ['property1'] + assert str(excinfo.value) == "No values for required properties for _Custom: (property1)." - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: NewExtension(property1='something', property2=4) + assert str(excinfo.value) == "'property2' is too small." -def test_custom_extension_invalid(): +def test_custom_extension_wrong_observable_type(): + ext = NewExtension(property1='something') + with pytest.raises(ValueError) as excinfo: + stix2.File(name="abc.txt", + extensions={ + "ntfs-ext": ext, + }) + + assert 'Cannot determine extension type' in excinfo.value.reason + + +def test_custom_extension_invalid_observable(): + # These extensions are being applied to improperly-created Observables. + # The Observable classes should have been created with the CustomObservable decorator. class Foo(object): pass - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: @stix2.observables.CustomExtension(Foo, 'x-new-ext', { 'property1': stix2.properties.StringProperty(required=True), }) class FooExtension(): pass # pragma: no cover + assert str(excinfo.value) == "'observable' must be a valid Observable class!" class Bar(stix2.observables._Observable): pass - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: @stix2.observables.CustomExtension(Bar, 'x-new-ext', { 'property1': stix2.properties.StringProperty(required=True), }) class BarExtension(): pass + assert "Unknown observable type" in str(excinfo.value) + assert "Custom observables must be created with the @CustomObservable decorator." in str(excinfo.value) + + class Baz(stix2.observables._Observable): + _type = 'Baz' + with pytest.raises(ValueError) as excinfo: + @stix2.observables.CustomExtension(Baz, 'x-new-ext', { + 'property1': stix2.properties.StringProperty(required=True), + }) + class BazExtension(): + pass + assert "Unknown observable type" in str(excinfo.value) + assert "Custom observables must be created with the @CustomObservable decorator." in str(excinfo.value) def test_parse_observable_with_custom_extension(): @@ -237,3 +303,20 @@ def test_parse_observable_with_custom_extension(): parsed = stix2.parse_observable(input_str) assert parsed.extensions['x-new-ext'].property2 == 12 + + +def test_parse_observable_with_unregistered_custom_extension(): + input_str = """{ + "type": "domain-name", + "value": "example.com", + "extensions": { + "x-foobar-ext": { + "property1": "foo", + "property2": 12 + } + } + }""" + + with pytest.raises(ValueError) as excinfo: + stix2.parse_observable(input_str) + assert "Can't parse Unknown extension type" in str(excinfo.value) From 0cd75e3fba56f603af150a31f5b7c740e7e1d381 Mon Sep 17 00:00:00 2001 From: clenk Date: Thu, 24 Aug 2017 17:53:43 -0400 Subject: [PATCH 4/9] Increase code coverage --- stix2/exceptions.py | 2 +- stix2/test/test_bundle.py | 14 ++++++++ stix2/test/test_custom.py | 43 +++++++++++++++++++++++++ stix2/test/test_observed_data.py | 41 ++++++++++++++++++++++++ stix2/test/test_properties.py | 55 ++++++++++++++++++++++++++++---- stix2/test/test_versioning.py | 2 ++ 6 files changed, 149 insertions(+), 8 deletions(-) diff --git a/stix2/exceptions.py b/stix2/exceptions.py index ef47dd0..8a33afa 100644 --- a/stix2/exceptions.py +++ b/stix2/exceptions.py @@ -121,7 +121,7 @@ class DependentPropertiesError(STIXError, TypeError): def __str__(self): msg = "The property dependencies for {0}: ({1}) are not met." return msg.format(self.cls.__name__, - ", ".join(x for x in self.dependencies)) + ", ".join(x for x, y in self.dependencies)) class AtLeastOnePropertyError(STIXError, TypeError): diff --git a/stix2/test/test_bundle.py b/stix2/test/test_bundle.py index 0733637..268c2f7 100644 --- a/stix2/test/test_bundle.py +++ b/stix2/test/test_bundle.py @@ -118,6 +118,20 @@ def test_create_bundle_with_arg_listarg_and_kwarg(indicator, malware, relationsh assert str(bundle) == EXPECTED_BUNDLE +def test_create_bundle_invalid(indicator, malware, relationship): + with pytest.raises(ValueError) as excinfo: + stix2.Bundle(objects=[1]) + assert excinfo.value.reason == "This property may only contain a dictionary or object" + + with pytest.raises(ValueError) as excinfo: + stix2.Bundle(objects=[{}]) + assert excinfo.value.reason == "This property may only contain a non-empty dictionary or object" + + with pytest.raises(ValueError) as excinfo: + stix2.Bundle(objects=[{'type': 'bundle'}]) + assert excinfo.value.reason == 'This property may not contain a Bundle object' + + def test_parse_bundle(): bundle = stix2.parse(EXPECTED_BUNDLE) diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index 33d9287..d2bb16f 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -115,6 +115,19 @@ def test_parse_custom_object_type(): assert nt.property1 == 'something' +def test_parse_unregistered_custom_object_type(): + nt_string = """{ + "type": "x-foobar-observable", + "created": "2015-12-21T19:59:11Z", + "property1": "something" + }""" + + with pytest.raises(stix2.exceptions.ParseError) as excinfo: + stix2.parse(nt_string) + assert "Can't parse unknown object type" in str(excinfo.value) + assert "use the CustomObject decorator." in str(excinfo.value) + + @stix2.observables.CustomObservable('x-new-observable', { 'property1': stix2.properties.StringProperty(required=True), 'property2': stix2.properties.IntegerProperty(), @@ -139,6 +152,36 @@ def test_custom_observable_object(): assert "'property2' is too small." in str(excinfo.value) +def test_custom_observable_object_invalid_ref_property(): + @stix2.observables.CustomObservable('x-new-obs', { + 'property1': stix2.properties.StringProperty(required=True), + 'property_ref': stix2.properties.StringProperty(), + }) + class NewObs(): + pass + + with pytest.raises(ValueError) as excinfo: + NewObs(_valid_refs={'1': 'file'}, + property1='something', + property_ref='1') + assert "is named like an object reference property but is not an ObjectReferenceProperty" in str(excinfo.value) + + +def test_custom_observable_object_invalid_valid_refs(): + @stix2.observables.CustomObservable('x-new-obs', { + 'property1': stix2.properties.StringProperty(required=True), + 'property_ref': stix2.properties.ObjectReferenceProperty(valid_types='email-addr'), + }) + class NewObs(): + pass + + with pytest.raises(Exception) as excinfo: + NewObs(_valid_refs=['1'], + property1='something', + property_ref='1') + assert "must be created with _valid_refs as a dict, not a list" in str(excinfo.value) + + def test_parse_custom_observable_object(): nt_string = """{ "type": "x-new-observable", diff --git a/stix2/test/test_observed_data.py b/stix2/test/test_observed_data.py index d5641e7..1c71b22 100644 --- a/stix2/test/test_observed_data.py +++ b/stix2/test/test_observed_data.py @@ -125,6 +125,42 @@ def test_observed_data_example_with_bad_refs(): assert excinfo.value.reason == "Invalid object reference for 'Directory:contains_refs': '2' is not a valid object in local scope" +def test_observed_data_example_with_non_dictionary(): + with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: + stix2.ObservedData( + id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T19:58:16.000Z", + modified="2016-04-06T19:58:16.000Z", + first_observed="2015-12-21T19:00:00Z", + last_observed="2015-12-21T19:00:00Z", + number_observed=50, + objects="file: foo.exe", + ) + + assert excinfo.value.cls == stix2.ObservedData + assert excinfo.value.prop_name == "objects" + assert 'must contain a dictionary' in excinfo.value.reason + + +def test_observed_data_example_with_empty_dictionary(): + with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: + stix2.ObservedData( + id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T19:58:16.000Z", + modified="2016-04-06T19:58:16.000Z", + first_observed="2015-12-21T19:00:00Z", + last_observed="2015-12-21T19:00:00Z", + number_observed=50, + objects={}, + ) + + assert excinfo.value.cls == stix2.ObservedData + assert excinfo.value.prop_name == "objects" + assert 'must contain a non-empty dictionary' in excinfo.value.reason + + @pytest.mark.parametrize("data", [ EXPECTED, { @@ -416,6 +452,8 @@ def test_parse_email_message_with_at_least_one_error(data): assert excinfo.value.cls == stix2.EmailMIMEComponent assert excinfo.value.properties == ["body", "body_raw_ref"] + assert "At least one of the" in str(excinfo.value) + assert "must be populated" in str(excinfo.value) @pytest.mark.parametrize("data", [ @@ -555,6 +593,7 @@ def test_artifact_mutual_exclusion_error(): assert excinfo.value.cls == stix2.Artifact assert excinfo.value.properties == ["payload_bin", "url"] + assert 'are mutually exclusive' in str(excinfo.value) def test_directory_example(): @@ -800,6 +839,8 @@ def test_file_example_encryption_error(): assert excinfo.value.cls == stix2.File assert excinfo.value.dependencies == [("is_encrypted", "encryption_algorithm")] + assert "property dependencies" in str(excinfo.value) + assert "are not met" in str(excinfo.value) with pytest.raises(stix2.exceptions.DependentPropertiesError) as excinfo: stix2.File(name="qwerty.dll", diff --git a/stix2/test/test_properties.py b/stix2/test/test_properties.py index 01daebf..4efc51a 100644 --- a/stix2/test/test_properties.py +++ b/stix2/test/test_properties.py @@ -5,10 +5,10 @@ from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError from stix2.observables import EmailMIMEComponent, ExtensionsProperty from stix2.properties import (BinaryProperty, BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, - EnumProperty, HashesProperty, HexProperty, - IDProperty, IntegerProperty, ListProperty, - Property, ReferenceProperty, StringProperty, - TimestampProperty, TypeProperty) + EnumProperty, FloatProperty, HashesProperty, + HexProperty, IDProperty, IntegerProperty, + ListProperty, Property, ReferenceProperty, + StringProperty, TimestampProperty, TypeProperty) from .constants import FAKE_TIME @@ -119,6 +119,27 @@ def test_integer_property_invalid(value): int_prop.clean(value) +@pytest.mark.parametrize("value", [ + 2, + -1, + 3.14, + False, +]) +def test_float_property_valid(value): + int_prop = FloatProperty() + assert int_prop.clean(value) is not None + + +@pytest.mark.parametrize("value", [ + "something", + StringProperty(), +]) +def test_float_property_invalid(value): + int_prop = FloatProperty() + with pytest.raises(ValueError): + int_prop.clean(value) + + @pytest.mark.parametrize("value", [ True, False, @@ -210,10 +231,22 @@ def test_dictionary_property_valid(d): {'a'*300: 'something'}, {'Hey!': 'something'}, ]) +def test_dictionary_property_invalid_key(d): + dict_prop = DictionaryProperty() + + with pytest.raises(DictionaryKeyError) as excinfo: + dict_prop.clean(d) + assert "Invalid dictionary key" in str(excinfo.value) + + +@pytest.mark.parametrize("d", [ + {}, + "{'description': 'something'}", +]) def test_dictionary_property_invalid(d): dict_prop = DictionaryProperty() - with pytest.raises(DictionaryKeyError): + with pytest.raises(ValueError): dict_prop.clean(d) @@ -250,10 +283,18 @@ def test_embedded_property(): emb_prop.clean("string") -def test_enum_property(): - enum_prop = EnumProperty(['a', 'b', 'c']) +@pytest.mark.parametrize("value", [ + ['a', 'b', 'c'], + ('a', 'b', 'c'), + 'b', +]) +def test_enum_property_valid(value): + enum_prop = EnumProperty(value) assert enum_prop.clean('b') + +def test_enum_property_invalid(): + enum_prop = EnumProperty(['a', 'b', 'c']) with pytest.raises(ValueError): enum_prop.clean('z') diff --git a/stix2/test/test_versioning.py b/stix2/test/test_versioning.py index 281ae71..42584ad 100644 --- a/stix2/test/test_versioning.py +++ b/stix2/test/test_versioning.py @@ -160,6 +160,7 @@ def test_versioning_error_new_version_of_revoked(): with pytest.raises(stix2.exceptions.RevokeError) as excinfo: campaign_v2.new_version(name="barney") + assert str(excinfo.value) == "Cannot create a new version of a revoked object." assert excinfo.value.called_by == "new_version" @@ -178,5 +179,6 @@ def test_versioning_error_revoke_of_revoked(): with pytest.raises(stix2.exceptions.RevokeError) as excinfo: campaign_v2.revoke() + assert str(excinfo.value) == "Cannot revoke an already revoked object." assert excinfo.value.called_by == "revoke" From 22c749d0dfb2a07bc003d540ce47c1e06d935f03 Mon Sep 17 00:00:00 2001 From: = Date: Wed, 30 Aug 2017 11:18:11 -0400 Subject: [PATCH 5/9] filter changes --- stix2/sources/__init__.py | 180 +--------------------------- stix2/sources/filesystem.py | 23 +--- stix2/sources/filters.py | 227 ++++++++++++++++++++++++++++++++++++ stix2/sources/memory.py | 23 +--- stix2/sources/taxii.py | 20 +--- 5 files changed, 240 insertions(+), 233 deletions(-) create mode 100644 stix2/sources/filters.py diff --git a/stix2/sources/__init__.py b/stix2/sources/__init__.py index d8676ca..ba5528b 100644 --- a/stix2/sources/__init__.py +++ b/stix2/sources/__init__.py @@ -16,56 +16,19 @@ Notes: """ -import collections import copy import uuid from six import iteritems - -class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])): - __slots__ = () - - def __new__(cls, field, op, value): - # If value is a list, convert it to a tuple so it is hashable. - if isinstance(value, list): - value = tuple(value) - self = super(Filter, cls).__new__(cls, field, op, value) - return self +from filters import (FILTER_OPS, FILTER_VALUE_TYPES, STIX_COMMON_FIELDS, + STIX_COMMON_FILTERS_MAP) def make_id(): return str(uuid.uuid4()) -# Currently, only STIX 2.0 common SDO fields (that are not complex objects) -# are supported for filtering on -STIX_COMMON_FIELDS = [ - "created", - "created_by_ref", - "external_references.source_name", - "external_references.description", - "external_references.url", - "external_references.hashes", - "external_references.external_id", - "granular_markings.marking_ref", - "granular_markings.selectors", - "id", - "labels", - "modified", - "object_marking_refs", - "revoked", - "type", - "granular_markings" -] - -# Supported filter operations -FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<='] - -# Supported filter value types -FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple] - - class DataStore(object): """ An implementer will create a concrete subclass from @@ -306,7 +269,7 @@ class DataSource(object): clean = False break try: - match = getattr(STIXCommonPropertyFilters, filter_.field)(filter_, stix_obj) + match = STIX_COMMON_FILTERS_MAP[filter_.field](filter_, stix_obj) if not match: clean = False break @@ -527,140 +490,3 @@ class CompositeDataSource(DataSource): """ return copy.deepcopy(self.data_sources.values()) - - -class STIXCommonPropertyFilters(object): - """ - """ - @classmethod - def _all(cls, filter_, stix_obj_field): - """all filter operations (for filters whose value type can be applied to any operation type)""" - if filter_.op == "=": - return stix_obj_field == filter_.value - elif filter_.op == "!=": - return stix_obj_field != filter_.value - elif filter_.op == "in": - return stix_obj_field in filter_.value - elif filter_.op == ">": - return stix_obj_field > filter_.value - elif filter_.op == "<": - return stix_obj_field < filter_.value - elif filter_.op == ">=": - return stix_obj_field >= filter_.value - elif filter_.op == "<=": - return stix_obj_field <= filter_.value - else: - return -1 - - @classmethod - def _id(cls, filter_, stix_obj_id): - """base filter types""" - if filter_.op == "=": - return stix_obj_id == filter_.value - elif filter_.op == "!=": - return stix_obj_id != filter_.value - else: - return -1 - - @classmethod - def _boolean(cls, filter_, stix_obj_field): - if filter_.op == "=": - return stix_obj_field == filter_.value - elif filter_.op == "!=": - return stix_obj_field != filter_.value - else: - return -1 - - @classmethod - def _string(cls, filter_, stix_obj_field): - return cls._all(filter_, stix_obj_field) - - @classmethod - def _timestamp(cls, filter_, stix_obj_timestamp): - return cls._all(filter_, stix_obj_timestamp) - - # STIX 2.0 Common Property filters - @classmethod - def created(cls, filter_, stix_obj): - return cls._timestamp(filter_, stix_obj["created"]) - - @classmethod - def created_by_ref(cls, filter_, stix_obj): - return cls._id(filter_, stix_obj["created_by_ref"]) - - @classmethod - def external_references(cls, filter_, stix_obj): - """ - STIX object's can have a list of external references - - external_references properties: - external_references.source_name (string) - external_references.description (string) - external_references.url (string) - external_references.hashes (hash, but for filtering purposes, a string) - external_references.external_id (string) - - """ - for er in stix_obj["external_references"]: - # grab er property name from filter field - filter_field = filter_.field.split(".")[1] - r = cls._string(filter_, er[filter_field]) - if r: - return r - return False - - @classmethod - def granular_markings(cls, filter_, stix_obj): - """ - STIX object's can have a list of granular marking references - - granular_markings properties: - granular_markings.marking_ref (id) - granular_markings.selectors (string) - - """ - for gm in stix_obj["granular_markings"]: - # grab gm property name from filter field - filter_field = filter_.field.split(".")[1] - - if filter_field == "marking_ref": - return cls._id(filter_, gm[filter_field]) - - elif filter_field == "selectors": - for selector in gm[filter_field]: - r = cls._string(filter_, selector) - if r: - return r - return False - - @classmethod - def id(cls, filter_, stix_obj): - return cls._id(filter_, stix_obj["id"]) - - @classmethod - def labels(cls, filter_, stix_obj): - for label in stix_obj["labels"]: - r = cls._string(filter_, label) - if r: - return r - return False - - @classmethod - def modified(cls, filter_, stix_obj): - return cls._timestamp(filter_, stix_obj["created"]) - - @classmethod - def object_markings_ref(cls, filter_, stix_obj): - for marking_id in stix_obj["object_market_refs"]: - r = cls._id(filter_, marking_id) - if r: - return r - return False - - @classmethod - def revoked(cls, filter_, stix_obj): - return cls._boolean(filter_, stix_obj["revoked"]) - - @classmethod - def type(cls, filter_, stix_obj): - return cls._string(filter_, stix_obj["type"]) diff --git a/stix2/sources/filesystem.py b/stix2/sources/filesystem.py index 0613ac0..c8ad0b0 100644 --- a/stix2/sources/filesystem.py +++ b/stix2/sources/filesystem.py @@ -13,7 +13,7 @@ import json import os from stix2 import Bundle -from stix2.sources import DataSink, DataSource, DataStore +from stix2.sources import DataSink, DataSource, DataStore, Filter class FileSystemStore(DataStore): @@ -77,13 +77,7 @@ class FileSystemSource(DataSource): def get(self, stix_id, _composite_filters=None): """ """ - query = [ - { - "field": "id", - "op": "=", - "value": stix_id - } - ] + query = [Filter("id", "=", stix_id)] all_data = self.query(query=query, _composite_filters=_composite_filters) @@ -95,21 +89,10 @@ class FileSystemSource(DataSource): """ Notes: Since FileSystem sources/sinks don't handle multiple versions - of a STIX object, this operation is futile. Pass call to get(). - (Approved by G.B.) + of a STIX object, this operation is unnecessary. Pass call to get(). """ - # query = [ - # { - # "field": "id", - # "op": "=", - # "value": stix_id - # } - # ] - - # all_data = self.query(query=query, _composite_filters=_composite_filters) - return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)] def query(self, query=None, _composite_filters=None): diff --git a/stix2/sources/filters.py b/stix2/sources/filters.py new file mode 100644 index 0000000..baa32c8 --- /dev/null +++ b/stix2/sources/filters.py @@ -0,0 +1,227 @@ +""" +Filters for Python STIX 2.0 DataSources, DataSinks, DataStores + +Classes: + Filter + +TODO: The script at the bottom of the module works (to capture +all the callable filter methods), however it causes this module +to be imported by itself twice. Not sure how big of deal that is, +or if cleaner solution possible. +""" + +import collections +import types + +import filters + +# Currently, only STIX 2.0 common SDO fields (that are not complex objects) +# are supported for filtering on +STIX_COMMON_FIELDS = [ + "created", + "created_by_ref", + "external_references.source_name", + "external_references.description", + "external_references.url", + "external_references.hashes", + "external_references.external_id", + "granular_markings.marking_ref", + "granular_markings.selectors", + "id", + "labels", + "modified", + "object_marking_refs", + "revoked", + "type", + "granular_markings" +] + +# Supported filter operations +FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<='] + +# Supported filter value types +FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple] + +# filter lookup map - STIX 2 common fields -> filter method +STIX_COMMON_FILTERS_MAP = {} + + +class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])): + __slots__ = () + + def __new__(cls, field, op, value): + # If value is a list, convert it to a tuple so it is hashable. + if isinstance(value, list): + value = tuple(value) + self = super(Filter, cls).__new__(cls, field, op, value) + return self + + +# primitive type filters + +def _all_filter(filter_, stix_obj_field): + """all filter operations (for filters whose value type can be applied to any operation type)""" + if filter_.op == "=": + return stix_obj_field == filter_.value + elif filter_.op == "!=": + return stix_obj_field != filter_.value + elif filter_.op == "in": + return stix_obj_field in filter_.value + elif filter_.op == ">": + return stix_obj_field > filter_.value + elif filter_.op == "<": + return stix_obj_field < filter_.value + elif filter_.op == ">=": + return stix_obj_field >= filter_.value + elif filter_.op == "<=": + return stix_obj_field <= filter_.value + else: + return -1 + + +def _id_filter(filter_, stix_obj_id): + """base filter types""" + if filter_.op == "=": + return stix_obj_id == filter_.value + elif filter_.op == "!=": + return stix_obj_id != filter_.value + else: + return -1 + + +def _boolean_filter(filter_, stix_obj_field): + if filter_.op == "=": + return stix_obj_field == filter_.value + elif filter_.op == "!=": + return stix_obj_field != filter_.value + else: + return -1 + + +def _string_filter(filter_, stix_obj_field): + return _all_filter(filter_, stix_obj_field) + + +def _timestamp_filter(filter_, stix_obj_timestamp): + return _all_filter(filter_, stix_obj_timestamp) + +# STIX 2.0 Common Property filters +# The naming of these functions is important as +# they are used to index a mapping dictionary from +# STIX common field names to these filter functions. +# +# REQUIRED naming scheme: +# "check__filter" + + +def check_created_filter(filter_, stix_obj): + return _timestamp_filter(filter_, stix_obj["created"]) + + +def check_created_by_ref_filter(filter_, stix_obj): + return _id_filter(filter_, stix_obj["created_by_ref"]) + + +def check_external_references_filter(filter_, stix_obj): + """ + STIX object's can have a list of external references + + external_references properties: + external_references.source_name (string) + external_references.description (string) + external_references.url (string) + external_references.hashes (hash, but for filtering purposes, a string) + external_references.external_id (string) + + """ + for er in stix_obj["external_references"]: + # grab er property name from filter field + filter_field = filter_.field.split(".")[1] + r = _string_filter(filter_, er[filter_field]) + if r: + return r + return False + + +def check_granular_markings_filter(filter_, stix_obj): + """ + STIX object's can have a list of granular marking references + + granular_markings properties: + granular_markings.marking_ref (id) + granular_markings.selectors (string) + + """ + for gm in stix_obj["granular_markings"]: + # grab gm property name from filter field + filter_field = filter_.field.split(".")[1] + + if filter_field == "marking_ref": + return _id_filter(filter_, gm[filter_field]) + + elif filter_field == "selectors": + for selector in gm[filter_field]: + r = _string_filter(filter_, selector) + if r: + return r + return False + + +def check_id_filter(filter_, stix_obj): + return _id_filter(filter_, stix_obj["id"]) + + +def check_labels_filter(filter_, stix_obj): + for label in stix_obj["labels"]: + r = _string_filter(filter_, label) + if r: + return r + return False + + +def check_modified_filter(filter_, stix_obj): + return _timestamp_filter(filter_, stix_obj["created"]) + + +def check_object_markings_ref_filter(filter_, stix_obj): + for marking_id in stix_obj["object_market_refs"]: + r = _id_filter(filter_, marking_id) + if r: + return r + return False + + +def check_revoked_filter(filter_, stix_obj): + return _boolean_filter(filter_, stix_obj["revoked"]) + + +def check_type_filter(filter_, stix_obj): + return _string_filter(filter_, stix_obj["type"]) + + +# script to collect STIX common field filter +# functions and create mapping to them + +""" +MK: I want to build the filter name -> filter function dictionary +dynamically whenever it is imported. By enumerating the functions +in this module, extracting the "check*" functions and making +pointers to them. But having issues getting an interable of the +modules entities. globals() works but returns an active dictionary +so iterating over it is a no go +""" + +for entity in dir(filters): + if "check_" in str(entity) and isinstance(filters.__dict__.get(entity), types.FunctionType): + field_name = entity.split("_")[1].split("_")[0] + STIX_COMMON_FILTERS_MAP[field_name] = filters.__dict__.get(entity) + +# Tried this to, didnt work ############## +""" +import sys +for entity in dir(sys.modules[__name__]): + print(entity) + if "check_" in str(entity) and type(entity) == "function": + print(sys.modules[__name__].__dict__.get(entity)) + STIX_COMMON_FILTERS_MAP[str(entity)] = sys.modules[__name__].__dict__.get(entity) +""" diff --git a/stix2/sources/memory.py b/stix2/sources/memory.py index 24f3c1f..bf87517 100644 --- a/stix2/sources/memory.py +++ b/stix2/sources/memory.py @@ -22,7 +22,7 @@ import json import os from stix2 import Bundle -from stix2.sources import DataSink, DataSource, DataStore +from stix2.sources import DataSink, DataSource, DataStore, Filter from stix2validator import validate_string @@ -204,13 +204,7 @@ class MemorySource(DataSource): return stix_obj # if there are filters from the composite level, process full query - query = [ - { - "field": "id", - "op": "=", - "value": stix_id - } - ] + query = [Filter("id", "=", stix_id)] all_data = self.query(query=query, _composite_filters=_composite_filters) @@ -223,21 +217,10 @@ class MemorySource(DataSource): """ Notes: Since Memory sources/sinks don't handle multiple versions of a - STIX object, this operation is futile. Translate call to get(). - (Approved by G.B.) + STIX object, this operation is unnecessary. Translate call to get(). """ - # query = [ - # { - # "field": "id", - # "op": "=", - # "value": stix_id - # } - # ] - - # all_data = self.query(query=query, _composite_filters=_composite_filters) - return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)] def query(self, query=None, _composite_filters=None): diff --git a/stix2/sources/taxii.py b/stix2/sources/taxii.py index 47ad8ed..b9dc8c4 100644 --- a/stix2/sources/taxii.py +++ b/stix2/sources/taxii.py @@ -12,7 +12,7 @@ TODO: Test everything import json -from stix2.sources import DataSink, DataSource, DataStore, make_id +from stix2.sources import DataSink, DataSource, DataStore, Filter, make_id TAXII_FILTERS = ['added_after', 'id', 'type', 'version'] @@ -89,16 +89,8 @@ class TAXIICollectionSource(DataSource): """ # make query in TAXII query format since 'id' is TAXII field query = [ - { - "field": "match[id]", - "op": "=", - "value": stix_id - }, - { - "field": "match[version]", - "op": "=", - "value": "all" - } + Filter("match[id]", "=", stix_id), + Filter("match[version]", "=", "all") ] all_data = self.query(query=query, _composite_filters=_composite_filters) @@ -138,11 +130,7 @@ class TAXIICollectionSource(DataSource): For instance - "?match[type]=indicator,sighting" should be in a query dict as follows: - { - "field": "type" - "op": "=", - "value": "indicator,sighting" - } + Filter("type", "=", "indicator,sighting") Args: query (list): list of filters to extract which ones are TAXII From 0e658255a8d0bfa670a0fec8ab8588ec6b51dfd0 Mon Sep 17 00:00:00 2001 From: clenk Date: Wed, 30 Aug 2017 15:33:28 -0400 Subject: [PATCH 6/9] Move check to when custom observable is defined Catch properties named "_ref" or "_refs" that aren't ObjectReferenceProperty when the custom observable is defined, not when it is instantiated. --- stix2/base.py | 16 ++++++---------- stix2/observables.py | 10 ++++++++++ stix2/test/test_custom.py | 35 +++++++++++++++++++++++++---------- 3 files changed, 41 insertions(+), 20 deletions(-) diff --git a/stix2/base.py b/stix2/base.py index 7de193b..185ff35 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -205,18 +205,14 @@ class _Observable(_STIXBase): try: allowed_types = prop.contained.valid_types except AttributeError: - try: - allowed_types = prop.valid_types - except AttributeError: - raise ValueError("'%s' is named like an object reference property but " - "is not an ObjectReferenceProperty or a ListProperty " - "containing ObjectReferenceProperty." % prop_name) + allowed_types = prop.valid_types + + try: + ref_type = self._STIXBase__valid_refs[ref] + except TypeError: + raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__) if allowed_types: - try: - ref_type = self._STIXBase__valid_refs[ref] - except TypeError: - raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__) if ref_type not in allowed_types: raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type)) diff --git a/stix2/observables.py b/stix2/observables.py index 50eb6d1..4c29785 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -766,6 +766,16 @@ def CustomObservable(type='x-custom-observable', properties={}): _properties = { 'type': TypeProperty(_type), } + # Check properties ending in "_ref/s" are ObjectReferenceProperties + for prop_name, prop in properties.items(): + if prop_name.endswith('_ref') and prop.__class__.__name__ != 'ObjectReferenceProperty': + raise ValueError("'%s' is named like an object reference property but " + "is not an ObjectReferenceProperty." % prop_name) + elif (prop_name.endswith('_refs') and (prop.__class__.__name__ != 'ListProperty' + or prop.contained.__class__.__name__ != 'ObjectReferenceProperty')): + raise ValueError("'%s' is named like an object reference property but " + "is not a ListProperty containing ObjectReferenceProperty." % prop_name) + _properties.update(properties) def __init__(self, **kwargs): diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index d2bb16f..313a0a8 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -153,20 +153,35 @@ def test_custom_observable_object(): def test_custom_observable_object_invalid_ref_property(): - @stix2.observables.CustomObservable('x-new-obs', { - 'property1': stix2.properties.StringProperty(required=True), - 'property_ref': stix2.properties.StringProperty(), - }) - class NewObs(): - pass - with pytest.raises(ValueError) as excinfo: - NewObs(_valid_refs={'1': 'file'}, - property1='something', - property_ref='1') + @stix2.observables.CustomObservable('x-new-obs', { + 'property_ref': stix2.properties.StringProperty(), + }) + class NewObs(): + pass assert "is named like an object reference property but is not an ObjectReferenceProperty" in str(excinfo.value) +def test_custom_observable_object_invalid_refs_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.observables.CustomObservable('x-new-obs', { + 'property_refs': stix2.properties.StringProperty(), + }) + class NewObs(): + pass + assert "is named like an object reference property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value) + + +def test_custom_observable_object_invalid_refs_list_property(): + with pytest.raises(ValueError) as excinfo: + @stix2.observables.CustomObservable('x-new-obs', { + 'property_refs': stix2.properties.ListProperty(stix2.properties.StringProperty), + }) + class NewObs(): + pass + assert "is named like an object reference property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value) + + def test_custom_observable_object_invalid_valid_refs(): @stix2.observables.CustomObservable('x-new-obs', { 'property1': stix2.properties.StringProperty(required=True), From 8a33cb7716bd8d567dc1e1e66f8c4a6e2164662f Mon Sep 17 00:00:00 2001 From: clenk Date: Wed, 30 Aug 2017 16:15:05 -0400 Subject: [PATCH 7/9] Touch up ObjectReferenceProperty checks - reword "_refs" error - check with isinstance instead of __class__.__name__ --- stix2/observables.py | 8 ++++---- stix2/test/test_custom.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/stix2/observables.py b/stix2/observables.py index 4c29785..c5290ff 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -768,12 +768,12 @@ def CustomObservable(type='x-custom-observable', properties={}): } # Check properties ending in "_ref/s" are ObjectReferenceProperties for prop_name, prop in properties.items(): - if prop_name.endswith('_ref') and prop.__class__.__name__ != 'ObjectReferenceProperty': + if prop_name.endswith('_ref') and not isinstance(prop, ObjectReferenceProperty): raise ValueError("'%s' is named like an object reference property but " "is not an ObjectReferenceProperty." % prop_name) - elif (prop_name.endswith('_refs') and (prop.__class__.__name__ != 'ListProperty' - or prop.contained.__class__.__name__ != 'ObjectReferenceProperty')): - raise ValueError("'%s' is named like an object reference property but " + elif (prop_name.endswith('_refs') and (not isinstance(prop, ListProperty) + or not isinstance(prop.contained, ObjectReferenceProperty))): + raise ValueError("'%s' is named like an object reference list property but " "is not a ListProperty containing ObjectReferenceProperty." % prop_name) _properties.update(properties) diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index 313a0a8..266cfd2 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -169,7 +169,7 @@ def test_custom_observable_object_invalid_refs_property(): }) class NewObs(): pass - assert "is named like an object reference property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value) + assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value) def test_custom_observable_object_invalid_refs_list_property(): @@ -179,7 +179,7 @@ def test_custom_observable_object_invalid_refs_list_property(): }) class NewObs(): pass - assert "is named like an object reference property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value) + assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value) def test_custom_observable_object_invalid_valid_refs(): From 7b46283a5cc9b19037861726c69960145175338e Mon Sep 17 00:00:00 2001 From: Greg Back Date: Thu, 31 Aug 2017 18:03:12 +0000 Subject: [PATCH 8/9] Build filter function map --- stix2/sources/__init__.py | 6 ++--- stix2/sources/filters.py | 39 +++++++-------------------------- stix2/sources/memory.py | 3 ++- stix2/sources/taxii.py | 3 ++- stix2/test/test_data_sources.py | 3 ++- 5 files changed, 17 insertions(+), 37 deletions(-) diff --git a/stix2/sources/__init__.py b/stix2/sources/__init__.py index 53b005e..7241a0b 100644 --- a/stix2/sources/__init__.py +++ b/stix2/sources/__init__.py @@ -20,8 +20,8 @@ import uuid from six import iteritems -from filters import (FILTER_OPS, FILTER_VALUE_TYPES, STIX_COMMON_FIELDS, - STIX_COMMON_FILTERS_MAP) +from stix2.sources.filters import (FILTER_OPS, FILTER_VALUE_TYPES, + STIX_COMMON_FIELDS, STIX_COMMON_FILTERS_MAP) def make_id(): @@ -273,7 +273,7 @@ class DataSource(object): clean = False break - match = STIX_COMMON_FILTERS_MAP[filter_.field](filter_, stix_obj) + match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj) if not match: clean = False break diff --git a/stix2/sources/filters.py b/stix2/sources/filters.py index baa32c8..7758369 100644 --- a/stix2/sources/filters.py +++ b/stix2/sources/filters.py @@ -13,8 +13,6 @@ or if cleaner solution possible. import collections import types -import filters - # Currently, only STIX 2.0 common SDO fields (that are not complex objects) # are supported for filtering on STIX_COMMON_FIELDS = [ @@ -180,11 +178,11 @@ def check_labels_filter(filter_, stix_obj): def check_modified_filter(filter_, stix_obj): - return _timestamp_filter(filter_, stix_obj["created"]) + return _timestamp_filter(filter_, stix_obj["modified"]) -def check_object_markings_ref_filter(filter_, stix_obj): - for marking_id in stix_obj["object_market_refs"]: +def check_object_marking_refs_filter(filter_, stix_obj): + for marking_id in stix_obj["object_marking_refs"]: r = _id_filter(filter_, marking_id) if r: return r @@ -199,29 +197,8 @@ def check_type_filter(filter_, stix_obj): return _string_filter(filter_, stix_obj["type"]) -# script to collect STIX common field filter -# functions and create mapping to them - -""" -MK: I want to build the filter name -> filter function dictionary -dynamically whenever it is imported. By enumerating the functions -in this module, extracting the "check*" functions and making -pointers to them. But having issues getting an interable of the -modules entities. globals() works but returns an active dictionary -so iterating over it is a no go -""" - -for entity in dir(filters): - if "check_" in str(entity) and isinstance(filters.__dict__.get(entity), types.FunctionType): - field_name = entity.split("_")[1].split("_")[0] - STIX_COMMON_FILTERS_MAP[field_name] = filters.__dict__.get(entity) - -# Tried this to, didnt work ############## -""" -import sys -for entity in dir(sys.modules[__name__]): - print(entity) - if "check_" in str(entity) and type(entity) == "function": - print(sys.modules[__name__].__dict__.get(entity)) - STIX_COMMON_FILTERS_MAP[str(entity)] = sys.modules[__name__].__dict__.get(entity) -""" +# Create mapping of field names to filter functions +for name, obj in dict(globals()).items(): + if "check_" in name and isinstance(obj, types.FunctionType): + field_name = "_".join(name.split("_")[1:-1]) + STIX_COMMON_FILTERS_MAP[field_name] = obj diff --git a/stix2/sources/memory.py b/stix2/sources/memory.py index 9f2fa49..8cf8e20 100644 --- a/stix2/sources/memory.py +++ b/stix2/sources/memory.py @@ -24,7 +24,8 @@ import os from stix2validator import validate_string from stix2 import Bundle -from stix2.sources import DataSink, DataSource, DataStore, Filter +from stix2.sources import DataSink, DataSource, DataStore +from stix2.sources.filters import Filter class MemoryStore(DataStore): diff --git a/stix2/sources/taxii.py b/stix2/sources/taxii.py index b9dc8c4..41632ae 100644 --- a/stix2/sources/taxii.py +++ b/stix2/sources/taxii.py @@ -12,7 +12,8 @@ TODO: Test everything import json -from stix2.sources import DataSink, DataSource, DataStore, Filter, make_id +from stix2.sources import DataSink, DataSource, DataStore, make_id +from stix2.sources.filters import Filter TAXII_FILTERS = ['added_after', 'id', 'type', 'version'] diff --git a/stix2/test/test_data_sources.py b/stix2/test/test_data_sources.py index ee37825..79e0c8b 100644 --- a/stix2/test/test_data_sources.py +++ b/stix2/test/test_data_sources.py @@ -2,7 +2,8 @@ import pytest from taxii2client import Collection from stix2.sources import (CompositeDataSource, DataSink, DataSource, - DataStore, Filter, make_id, taxii) + DataStore, make_id, taxii) +from stix2.sources.filters import Filter from stix2.sources.memory import MemorySource COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/' From ba846f95016ae3160c904334a3c7ab4b9ccaa3ce Mon Sep 17 00:00:00 2001 From: Greg Back Date: Thu, 31 Aug 2017 18:23:08 +0000 Subject: [PATCH 9/9] Clean up some tests. --- stix2/sources/__init__.py | 47 +++--- stix2/test/test_data_sources.py | 267 ++++++++++++++++---------------- 2 files changed, 157 insertions(+), 157 deletions(-) diff --git a/stix2/sources/__init__.py b/stix2/sources/__init__.py index 7241a0b..f702748 100644 --- a/stix2/sources/__init__.py +++ b/stix2/sources/__init__.py @@ -253,34 +253,31 @@ class DataSource(object): for stix_obj in stix_objs: clean = True for filter_ in query: - try: - # skip filter as filter was identified (when added) as - # not a common filter - if filter_.field not in STIX_COMMON_FIELDS: - raise Exception("Error, field: {0} is not supported for filtering on.".format(filter_.field)) + # skip filter as filter was identified (when added) as + # not a common filter + if filter_.field not in STIX_COMMON_FIELDS: + raise ValueError("Error, field: {0} is not supported for filtering on.".format(filter_.field)) - # For properties like granular_markings and external_references - # need to break the first property from the string. - if "." in filter_.field: - field = filter_.field.split(".")[0] - else: - field = filter_.field + # For properties like granular_markings and external_references + # need to break the first property from the string. + if "." in filter_.field: + field = filter_.field.split(".")[0] + else: + field = filter_.field - # check filter "field" is in STIX object - if cant be - # applied due to STIX object, STIX object is discarded - # (i.e. did not make it through the filter) - if field not in stix_obj.keys(): - clean = False - break + # check filter "field" is in STIX object - if cant be + # applied due to STIX object, STIX object is discarded + # (i.e. did not make it through the filter) + if field not in stix_obj.keys(): + clean = False + break - match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj) - if not match: - clean = False - break - elif match == -1: - raise Exception("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field)) - except Exception as e: - raise ValueError(e) + match = STIX_COMMON_FILTERS_MAP[filter_.field.split('.')[0]](filter_, stix_obj) + if not match: + clean = False + break + elif match == -1: + raise ValueError("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field)) # if object unmarked after all filters, add it if clean: diff --git a/stix2/test/test_data_sources.py b/stix2/test/test_data_sources.py index 79e0c8b..76934fb 100644 --- a/stix2/test/test_data_sources.py +++ b/stix2/test/test_data_sources.py @@ -19,107 +19,110 @@ def collection(): return Collection(COLLECTION_URL, MockTAXIIClient()) -STIX_OBJS1 = [ - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.936Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - } -] +@pytest.fixture +def ds(): + return DataSource() -STIX_OBJS2 = [ - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-31T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - }, - { - "created": "2017-01-27T13:49:53.935Z", - "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", - "labels": [ - "url-watchlist" - ], - "modified": "2017-01-27T13:49:53.935Z", - "name": "Malicious site hosting downloader", - "pattern": "[url:value = 'http://x4z9arb.cn/4712']", - "type": "indicator", - "valid_from": "2017-01-27T13:49:53.935382Z" - } -] + +IND1 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND2 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND3 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.936Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND4 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND5 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND6 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-31T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND7 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} +IND8 = { + "created": "2017-01-27T13:49:53.935Z", + "id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f", + "labels": [ + "url-watchlist" + ], + "modified": "2017-01-27T13:49:53.935Z", + "name": "Malicious site hosting downloader", + "pattern": "[url:value = 'http://x4z9arb.cn/4712']", + "type": "indicator", + "valid_from": "2017-01-27T13:49:53.935382Z" +} + +STIX_OBJS2 = [IND6, IND7, IND8] +STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5] def test_ds_smoke(): @@ -173,7 +176,7 @@ def test_parse_taxii_filters(): assert taxii_filters == expected_params -def test_add_get_remove_filter(): +def test_add_get_remove_filter(ds): # First 3 filters are valid, remaining fields are erroneous in some way valid_filters = [ @@ -187,8 +190,6 @@ def test_add_get_remove_filter(): Filter('created', '=', object()), ] - ds = DataSource() - assert len(ds.filters) == 0 ds.add_filter(valid_filters[0]) @@ -226,7 +227,7 @@ def test_add_get_remove_filter(): ds.add_filters(valid_filters) -def test_apply_common_filters(): +def test_apply_common_filters(ds): stix_objs = [ { "created": "2017-01-27T13:49:53.997Z", @@ -287,8 +288,6 @@ def test_apply_common_filters(): Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"), ] - ds = DataSource() - resp = ds.apply_common_filters(stix_objs, [filters[0]]) ids = [r['id'] for r in resp] assert stix_objs[0]['id'] in ids @@ -328,61 +327,65 @@ def test_apply_common_filters(): assert resp[0]['id'] == stix_objs[2]['id'] assert len(resp) == 1 - # These are used with STIX_OBJS2 - more_filters = [ - Filter("modified", "<", "2017-01-28T13:49:53.935Z"), - Filter("modified", ">", "2017-01-28T13:49:53.935Z"), - Filter("modified", ">=", "2017-01-27T13:49:53.935Z"), - Filter("modified", "<=", "2017-01-27T13:49:53.935Z"), - Filter("modified", "?", "2017-01-27T13:49:53.935Z"), - Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"), - Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"), - Filter("notacommonproperty", "=", "bar"), - ] - resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[0]]) +def test_filters0(ds): + resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[1]['id'] assert len(resp) == 2 - resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[1]]) + +def test_filters1(ds): + resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 1 - resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[2]]) + +def test_filters2(ds): + resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 3 - resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[3]]) + +def test_filters3(ds): + resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")]) assert resp[0]['id'] == STIX_OBJS2[1]['id'] assert len(resp) == 2 + +def test_filters4(ds): + fltr4 = Filter("modified", "?", "2017-01-27T13:49:53.935Z") with pytest.raises(ValueError) as excinfo: - ds.apply_common_filters(STIX_OBJS2, [more_filters[4]]) + ds.apply_common_filters(STIX_OBJS2, [fltr4]) assert str(excinfo.value) == ("Error, filter operator: {0} not supported " - "for specified field: {1}").format(more_filters[4].op, - more_filters[4].field) + "for specified field: {1}").format(fltr4.op, fltr4.field) - resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[5]]) + +def test_filters5(ds): + resp = ds.apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")]) assert resp[0]['id'] == STIX_OBJS2[0]['id'] assert len(resp) == 1 + +def test_filters6(ds): + fltr6 = Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f") with pytest.raises(ValueError) as excinfo: - ds.apply_common_filters(STIX_OBJS2, [more_filters[6]]) + ds.apply_common_filters(STIX_OBJS2, [fltr6]) assert str(excinfo.value) == ("Error, filter operator: {0} not supported " - "for specified field: {1}").format(more_filters[6].op, - more_filters[6].field) + "for specified field: {1}").format(fltr6.op, fltr6.field) + +def test_filters7(ds): + fltr7 = Filter("notacommonproperty", "=", "bar") with pytest.raises(ValueError) as excinfo: - ds.apply_common_filters(STIX_OBJS2, [more_filters[7]]) + ds.apply_common_filters(STIX_OBJS2, [fltr7]) assert str(excinfo.value) == ("Error, field: {0} is not supported for " - "filtering on.".format(more_filters[7].field)) + "filtering on.".format(fltr7.field)) -def test_deduplicate(): - ds = DataSource() +def test_deduplicate(ds): unique = ds.deduplicate(STIX_OBJS1) # Only 3 objects are unique