From e1330692c8c40f9860a8e47aef065999738d507e Mon Sep 17 00:00:00 2001 From: clenk Date: Fri, 14 Jul 2017 14:55:57 -0400 Subject: [PATCH] Move ObservableProperty, ExtensionsProperty, and Observable parsing code into observables.py to prevent circular imports and fix #23. --- stix2/__init__.py | 112 +++-------------------- stix2/observables.py | 168 ++++++++++++++++++++++++++++++++-- stix2/properties.py | 57 +----------- stix2/sdo.py | 3 +- stix2/test/test_custom.py | 12 +++ stix2/test/test_properties.py | 4 +- 6 files changed, 186 insertions(+), 170 deletions(-) diff --git a/stix2/__init__.py b/stix2/__init__.py index 904af9c..18c0b33 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -5,22 +5,24 @@ from . import exceptions from .bundle import Bundle from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, - AutonomousSystem, Directory, DomainName, - EmailAddress, EmailMessage, EmailMIMEComponent, File, - HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address, - MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt, - Process, RasterImageExt, SocketExt, Software, TCPExt, + AutonomousSystem, CustomObservable, Directory, + DomainName, EmailAddress, EmailMessage, + EmailMIMEComponent, File, HTTPRequestExt, ICMPExt, + IPv4Address, IPv6Address, MACAddress, Mutex, + NetworkTraffic, NTFSExt, PDFExt, Process, + RasterImageExt, SocketExt, Software, TCPExt, UNIXAccountExt, UserAccount, WindowsPEBinaryExt, WindowsPEOptionalHeaderType, WindowsPESection, WindowsProcessExt, WindowsRegistryKey, WindowsRegistryValueType, WindowsServiceExt, - X509Certificate, X509V3ExtenstionsType) + X509Certificate, X509V3ExtenstionsType, + parse_observable) from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, ExternalReference, GranularMarking, KillChainPhase, MarkingDefinition, StatementMarking, TLPMarking) -from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator, - IntrusionSet, Malware, ObservedData, Report, ThreatActor, - Tool, Vulnerability) +from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, + Identity, Indicator, IntrusionSet, Malware, ObservedData, + Report, ThreatActor, Tool, Vulnerability) from .sro import Relationship, Sighting from .utils import get_dict from .version import __version__ @@ -43,59 +45,6 @@ OBJ_MAP = { 'vulnerability': Vulnerability, } -OBJ_MAP_OBSERVABLE = { - 'artifact': Artifact, - 'autonomous-system': AutonomousSystem, - 'directory': Directory, - 'domain-name': DomainName, - 'email-address': EmailAddress, - 'email-message': EmailMessage, - 'file': File, - 'ipv4-addr': IPv4Address, - 'ipv6-addr': IPv6Address, - 'mac-addr': MACAddress, - 'mutex': Mutex, - 'network-traffic': NetworkTraffic, - 'process': Process, - 'software': Software, - 'url': URL, - 'user-account': UserAccount, - 'windows-registry-key': WindowsRegistryKey, - 'x509-certificate': X509Certificate, -} - -EXT_MAP_FILE = { - 'archive-ext': ArchiveExt, - 'ntfs-ext': NTFSExt, - 'pdf-ext': PDFExt, - 'raster-image-ext': RasterImageExt, - 'windows-pebinary-ext': WindowsPEBinaryExt -} - -EXT_MAP_NETWORK_TRAFFIC = { - 'http-request-ext': HTTPRequestExt, - 'icmp-ext': ICMPExt, - 'socket-ext': SocketExt, - 'tcp-ext': TCPExt, -} - -EXT_MAP_PROCESS = { - 'windows-process-ext': WindowsProcessExt, - 'windows-service-ext': WindowsServiceExt, -} - -EXT_MAP_USER_ACCOUNT = { - 'unix-account-ext': UNIXAccountExt, -} - -EXT_MAP = { - 'file': EXT_MAP_FILE, - 'network-traffic': EXT_MAP_NETWORK_TRAFFIC, - 'process': EXT_MAP_PROCESS, - 'user-account': EXT_MAP_USER_ACCOUNT, - -} - def parse(data, allow_custom=False): """Deserialize a string or file-like object into a STIX object. @@ -120,47 +69,8 @@ def parse(data, allow_custom=False): return obj_class(allow_custom=allow_custom, **obj) -def parse_observable(data, _valid_refs=[], allow_custom=False): - """Deserialize a string or file-like object into a STIX Cyber Observable object. - - Args: - data: The STIX 2 string to be parsed. - _valid_refs: A list of object references valid for the scope of the object being parsed. - allow_custom: Whether to allow custom properties or not. Default: False. - - Returns: - An instantiated Python STIX Cyber Observable object. - """ - - obj = get_dict(data) - obj['_valid_refs'] = _valid_refs - - if 'type' not in obj: - raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj)) - try: - obj_class = OBJ_MAP_OBSERVABLE[obj['type']] - except KeyError: - raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type']) - - if 'extensions' in obj and obj['type'] in EXT_MAP: - for name, ext in obj['extensions'].items(): - if name not in EXT_MAP[obj['type']]: - raise exceptions.ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type'])) - ext_class = EXT_MAP[obj['type']][name] - obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) - - return obj_class(allow_custom=allow_custom, **obj) - - def _register_type(new_type): """Register a custom STIX Object type. """ OBJ_MAP[new_type._type] = new_type - - -def _register_observable(new_observable): - """Register a custom STIX Cyber Observable type. - """ - - OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable diff --git a/stix2/observables.py b/stix2/observables.py index a8f3b67..e38e298 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -5,16 +5,72 @@ embedded in Email Message objects, inherit from _STIXBase instead of Observable and do not have a '_type' attribute. """ -import stix2 - from .base import _Extension, _Observable, _STIXBase -from .exceptions import AtLeastOnePropertyError, DependentPropertiesError +from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError, + ParseError) from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, - EmbeddedObjectProperty, EnumProperty, - ExtensionsProperty, FloatProperty, HashesProperty, - HexProperty, IntegerProperty, ListProperty, - ObjectReferenceProperty, StringProperty, - TimestampProperty, TypeProperty) + EmbeddedObjectProperty, EnumProperty, FloatProperty, + HashesProperty, HexProperty, IntegerProperty, + ListProperty, ObjectReferenceProperty, Property, + StringProperty, TimestampProperty, TypeProperty) +from .utils import get_dict + + +class ObservableProperty(Property): + + def clean(self, value): + try: + dictified = get_dict(value) + except ValueError: + raise ValueError("The observable property must contain a dictionary") + if dictified == {}: + raise ValueError("The dictionary property must contain a non-empty dictionary") + + valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) + + # from .__init__ import parse_observable # avoid circular import + for key, obj in dictified.items(): + parsed_obj = parse_observable(obj, valid_refs) + if not issubclass(type(parsed_obj), _Observable): + raise ValueError("Objects in an observable property must be " + "Cyber Observable Objects") + dictified[key] = parsed_obj + + return dictified + + +class ExtensionsProperty(DictionaryProperty): + """ Property for representing extensions on Observable objects + """ + + def __init__(self, enclosing_type=None, required=False): + self.enclosing_type = enclosing_type + super(ExtensionsProperty, self).__init__(required) + + def clean(self, value): + try: + dictified = get_dict(value) + except ValueError: + raise ValueError("The extensions property must contain a dictionary") + if dictified == {}: + raise ValueError("The dictionary property must contain a non-empty dictionary") + + if self.enclosing_type in EXT_MAP: + specific_type_map = EXT_MAP[self.enclosing_type] + for key, subvalue in dictified.items(): + if key in specific_type_map: + cls = specific_type_map[key] + if type(subvalue) is dict: + dictified[key] = cls(**subvalue) + elif type(subvalue) is cls: + dictified[key] = subvalue + else: + raise ValueError("Cannot determine extension type.") + else: + raise ValueError("The key used in the extensions dictionary is not an extension type name") + else: + raise ValueError("The enclosing type has no extensions defined") + return dictified class Artifact(_Observable): @@ -590,9 +646,101 @@ class X509Certificate(_Observable): } +OBJ_MAP_OBSERVABLE = { + 'artifact': Artifact, + 'autonomous-system': AutonomousSystem, + 'directory': Directory, + 'domain-name': DomainName, + 'email-address': EmailAddress, + 'email-message': EmailMessage, + 'file': File, + 'ipv4-addr': IPv4Address, + 'ipv6-addr': IPv6Address, + 'mac-addr': MACAddress, + 'mutex': Mutex, + 'network-traffic': NetworkTraffic, + 'process': Process, + 'software': Software, + 'url': URL, + 'user-account': UserAccount, + 'windows-registry-key': WindowsRegistryKey, + 'x509-certificate': X509Certificate, +} + +EXT_MAP_FILE = { + 'archive-ext': ArchiveExt, + 'ntfs-ext': NTFSExt, + 'pdf-ext': PDFExt, + 'raster-image-ext': RasterImageExt, + 'windows-pebinary-ext': WindowsPEBinaryExt +} + +EXT_MAP_NETWORK_TRAFFIC = { + 'http-request-ext': HTTPRequestExt, + 'icmp-ext': ICMPExt, + 'socket-ext': SocketExt, + 'tcp-ext': TCPExt, +} + +EXT_MAP_PROCESS = { + 'windows-process-ext': WindowsProcessExt, + 'windows-service-ext': WindowsServiceExt, +} + +EXT_MAP_USER_ACCOUNT = { + 'unix-account-ext': UNIXAccountExt, +} + +EXT_MAP = { + 'file': EXT_MAP_FILE, + 'network-traffic': EXT_MAP_NETWORK_TRAFFIC, + 'process': EXT_MAP_PROCESS, + 'user-account': EXT_MAP_USER_ACCOUNT, + +} + + +def parse_observable(data, _valid_refs=[], allow_custom=False): + """Deserialize a string or file-like object into a STIX Cyber Observable object. + + Args: + data: The STIX 2 string to be parsed. + _valid_refs: A list of object references valid for the scope of the object being parsed. + allow_custom: Whether to allow custom properties or not. Default: False. + + Returns: + An instantiated Python STIX Cyber Observable object. + """ + + obj = get_dict(data) + obj['_valid_refs'] = _valid_refs + + if 'type' not in obj: + raise ParseError("Can't parse object with no 'type' property: %s" % str(obj)) + try: + obj_class = OBJ_MAP_OBSERVABLE[obj['type']] + except KeyError: + raise ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type']) + + if 'extensions' in obj and obj['type'] in EXT_MAP: + for name, ext in obj['extensions'].items(): + if name not in EXT_MAP[obj['type']]: + raise ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type'])) + ext_class = EXT_MAP[obj['type']][name] + obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name]) + + return obj_class(allow_custom=allow_custom, **obj) + + +def _register_observable(new_observable): + """Register a custom STIX Cyber Observable type. + """ + + OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable + + def CustomObservable(type='x-custom-observable', properties={}): """Custom STIX Cyber Observable type decorator - """ def custom_builder(cls): @@ -608,7 +756,7 @@ def CustomObservable(type='x-custom-observable', properties={}): _Observable.__init__(self, **kwargs) cls.__init__(self, **kwargs) - stix2._register_observable(_Custom) + _register_observable(_Custom) return _Custom return custom_builder diff --git a/stix2/properties.py b/stix2/properties.py index 80e5345..35c239a 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -7,7 +7,7 @@ import uuid from six import text_type -from .base import _Observable, _STIXBase +from .base import _STIXBase from .exceptions import DictionaryKeyError from .utils import get_dict, parse_into_datetime @@ -220,29 +220,6 @@ class TimestampProperty(Property): return parse_into_datetime(value, self.precision) -class ObservableProperty(Property): - - def clean(self, value): - try: - dictified = get_dict(value) - except ValueError: - raise ValueError("The observable property must contain a dictionary") - if dictified == {}: - raise ValueError("The dictionary property must contain a non-empty dictionary") - - valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) - - from .__init__ import parse_observable # avoid circular import - for key, obj in dictified.items(): - parsed_obj = parse_observable(obj, valid_refs) - if not issubclass(type(parsed_obj), _Observable): - raise ValueError("Objects in an observable property must be " - "Cyber Observable Objects") - dictified[key] = parsed_obj - - return dictified - - class DictionaryProperty(Property): def clean(self, value): @@ -393,35 +370,3 @@ class EnumProperty(StringProperty): if value not in self.allowed: raise ValueError("value '%s' is not valid for this enumeration." % value) return self.string_type(value) - - -class ExtensionsProperty(DictionaryProperty): - def __init__(self, enclosing_type=None, required=False): - self.enclosing_type = enclosing_type - super(ExtensionsProperty, self).__init__(required) - - def clean(self, value): - try: - dictified = get_dict(value) - except ValueError: - raise ValueError("The extensions property must contain a dictionary") - if dictified == {}: - raise ValueError("The dictionary property must contain a non-empty dictionary") - - from .__init__ import EXT_MAP # avoid circular import - if self.enclosing_type in EXT_MAP: - specific_type_map = EXT_MAP[self.enclosing_type] - for key, subvalue in dictified.items(): - if key in specific_type_map: - cls = specific_type_map[key] - if type(subvalue) is dict: - dictified[key] = cls(**subvalue) - elif type(subvalue) is cls: - dictified[key] = subvalue - else: - raise ValueError("Cannot determine extension type.") - else: - raise ValueError("The key used in the extensions dictionary is not an extension type name") - else: - raise ValueError("The enclosing type has no extensions defined") - return dictified diff --git a/stix2/sdo.py b/stix2/sdo.py index 1ec3b21..9f2871e 100644 --- a/stix2/sdo.py +++ b/stix2/sdo.py @@ -4,9 +4,10 @@ import stix2 from .base import _STIXBase from .common import COMMON_PROPERTIES +from .observables import ObservableProperty from .other import KillChainPhase from .properties import (IDProperty, IntegerProperty, ListProperty, - ObservableProperty, ReferenceProperty, StringProperty, + ReferenceProperty, StringProperty, TimestampProperty, TypeProperty) from .utils import NOW diff --git a/stix2/test/test_custom.py b/stix2/test/test_custom.py index 1a816bd..934e1ce 100644 --- a/stix2/test/test_custom.py +++ b/stix2/test/test_custom.py @@ -166,3 +166,15 @@ def test_observable_custom_property_allowed(): allow_custom=True, ) assert no.x_foo == "bar" + + +def test_observed_data_with_custom_observable_object(): + no = NewObservable(property1='something') + ob_data = stix2.ObservedData( + first_observed=stix2.utils.NOW, + last_observed=stix2.utils.NOW, + number_observed=1, + objects={'0': no}, + allow_custom=True, + ) + assert ob_data.objects['0'].property1 == 'something' diff --git a/stix2/test/test_properties.py b/stix2/test/test_properties.py index 5395a9f..3a1446b 100644 --- a/stix2/test/test_properties.py +++ b/stix2/test/test_properties.py @@ -2,10 +2,10 @@ import pytest from stix2 import TCPExt from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError -from stix2.observables import EmailMIMEComponent +from stix2.observables import EmailMIMEComponent, ExtensionsProperty from stix2.properties import (BinaryProperty, BooleanProperty, DictionaryProperty, EmbeddedObjectProperty, - EnumProperty, ExtensionsProperty, HashesProperty, + EnumProperty, HashesProperty, HexProperty, IDProperty, IntegerProperty, ListProperty, Property, ReferenceProperty, StringProperty, TimestampProperty, TypeProperty)