Move ObservableProperty, ExtensionsProperty, and Observable parsing code

into observables.py to prevent circular imports and fix #23.
stix2.1
clenk 2017-07-14 14:55:57 -04:00
parent 6f680be8a6
commit e1330692c8
6 changed files with 186 additions and 170 deletions

View File

@ -5,22 +5,24 @@
from . import exceptions
from .bundle import Bundle
from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
AutonomousSystem, Directory, DomainName,
EmailAddress, EmailMessage, EmailMIMEComponent, File,
HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address,
MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt,
Process, RasterImageExt, SocketExt, Software, TCPExt,
AutonomousSystem, CustomObservable, Directory,
DomainName, EmailAddress, EmailMessage,
EmailMIMEComponent, File, HTTPRequestExt, ICMPExt,
IPv4Address, IPv6Address, MACAddress, Mutex,
NetworkTraffic, NTFSExt, PDFExt, Process,
RasterImageExt, SocketExt, Software, TCPExt,
UNIXAccountExt, UserAccount, WindowsPEBinaryExt,
WindowsPEOptionalHeaderType, WindowsPESection,
WindowsProcessExt, WindowsRegistryKey,
WindowsRegistryValueType, WindowsServiceExt,
X509Certificate, X509V3ExtenstionsType)
X509Certificate, X509V3ExtenstionsType,
parse_observable)
from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE,
ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking)
from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator,
IntrusionSet, Malware, ObservedData, Report, ThreatActor,
Tool, Vulnerability)
from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject,
Identity, Indicator, IntrusionSet, Malware, ObservedData,
Report, ThreatActor, Tool, Vulnerability)
from .sro import Relationship, Sighting
from .utils import get_dict
from .version import __version__
@ -43,59 +45,6 @@ OBJ_MAP = {
'vulnerability': Vulnerability,
}
OBJ_MAP_OBSERVABLE = {
'artifact': Artifact,
'autonomous-system': AutonomousSystem,
'directory': Directory,
'domain-name': DomainName,
'email-address': EmailAddress,
'email-message': EmailMessage,
'file': File,
'ipv4-addr': IPv4Address,
'ipv6-addr': IPv6Address,
'mac-addr': MACAddress,
'mutex': Mutex,
'network-traffic': NetworkTraffic,
'process': Process,
'software': Software,
'url': URL,
'user-account': UserAccount,
'windows-registry-key': WindowsRegistryKey,
'x509-certificate': X509Certificate,
}
EXT_MAP_FILE = {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
}
EXT_MAP_NETWORK_TRAFFIC = {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
}
EXT_MAP_PROCESS = {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
}
EXT_MAP_USER_ACCOUNT = {
'unix-account-ext': UNIXAccountExt,
}
EXT_MAP = {
'file': EXT_MAP_FILE,
'network-traffic': EXT_MAP_NETWORK_TRAFFIC,
'process': EXT_MAP_PROCESS,
'user-account': EXT_MAP_USER_ACCOUNT,
}
def parse(data, allow_custom=False):
"""Deserialize a string or file-like object into a STIX object.
@ -120,47 +69,8 @@ def parse(data, allow_custom=False):
return obj_class(allow_custom=allow_custom, **obj)
def parse_observable(data, _valid_refs=[], allow_custom=False):
"""Deserialize a string or file-like object into a STIX Cyber Observable object.
Args:
data: The STIX 2 string to be parsed.
_valid_refs: A list of object references valid for the scope of the object being parsed.
allow_custom: Whether to allow custom properties or not. Default: False.
Returns:
An instantiated Python STIX Cyber Observable object.
"""
obj = get_dict(data)
obj['_valid_refs'] = _valid_refs
if 'type' not in obj:
raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj))
try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
if name not in EXT_MAP[obj['type']]:
raise exceptions.ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
ext_class = EXT_MAP[obj['type']][name]
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
return obj_class(allow_custom=allow_custom, **obj)
def _register_type(new_type):
"""Register a custom STIX Object type.
"""
OBJ_MAP[new_type._type] = new_type
def _register_observable(new_observable):
"""Register a custom STIX Cyber Observable type.
"""
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable

View File

@ -5,16 +5,72 @@ embedded in Email Message objects, inherit from _STIXBase instead of Observable
and do not have a '_type' attribute.
"""
import stix2
from .base import _Extension, _Observable, _STIXBase
from .exceptions import AtLeastOnePropertyError, DependentPropertiesError
from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError,
ParseError)
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty,
ExtensionsProperty, FloatProperty, HashesProperty,
HexProperty, IntegerProperty, ListProperty,
ObjectReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
EmbeddedObjectProperty, EnumProperty, FloatProperty,
HashesProperty, HexProperty, IntegerProperty,
ListProperty, ObjectReferenceProperty, Property,
StringProperty, TimestampProperty, TypeProperty)
from .utils import get_dict
class ObservableProperty(Property):
def clean(self, value):
try:
dictified = get_dict(value)
except ValueError:
raise ValueError("The observable property must contain a dictionary")
if dictified == {}:
raise ValueError("The dictionary property must contain a non-empty dictionary")
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
# from .__init__ import parse_observable # avoid circular import
for key, obj in dictified.items():
parsed_obj = parse_observable(obj, valid_refs)
if not issubclass(type(parsed_obj), _Observable):
raise ValueError("Objects in an observable property must be "
"Cyber Observable Objects")
dictified[key] = parsed_obj
return dictified
class ExtensionsProperty(DictionaryProperty):
""" Property for representing extensions on Observable objects
"""
def __init__(self, enclosing_type=None, required=False):
self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(required)
def clean(self, value):
try:
dictified = get_dict(value)
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
if dictified == {}:
raise ValueError("The dictionary property must contain a non-empty dictionary")
if self.enclosing_type in EXT_MAP:
specific_type_map = EXT_MAP[self.enclosing_type]
for key, subvalue in dictified.items():
if key in specific_type_map:
cls = specific_type_map[key]
if type(subvalue) is dict:
dictified[key] = cls(**subvalue)
elif type(subvalue) is cls:
dictified[key] = subvalue
else:
raise ValueError("Cannot determine extension type.")
else:
raise ValueError("The key used in the extensions dictionary is not an extension type name")
else:
raise ValueError("The enclosing type has no extensions defined")
return dictified
class Artifact(_Observable):
@ -590,9 +646,101 @@ class X509Certificate(_Observable):
}
OBJ_MAP_OBSERVABLE = {
'artifact': Artifact,
'autonomous-system': AutonomousSystem,
'directory': Directory,
'domain-name': DomainName,
'email-address': EmailAddress,
'email-message': EmailMessage,
'file': File,
'ipv4-addr': IPv4Address,
'ipv6-addr': IPv6Address,
'mac-addr': MACAddress,
'mutex': Mutex,
'network-traffic': NetworkTraffic,
'process': Process,
'software': Software,
'url': URL,
'user-account': UserAccount,
'windows-registry-key': WindowsRegistryKey,
'x509-certificate': X509Certificate,
}
EXT_MAP_FILE = {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
}
EXT_MAP_NETWORK_TRAFFIC = {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
}
EXT_MAP_PROCESS = {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
}
EXT_MAP_USER_ACCOUNT = {
'unix-account-ext': UNIXAccountExt,
}
EXT_MAP = {
'file': EXT_MAP_FILE,
'network-traffic': EXT_MAP_NETWORK_TRAFFIC,
'process': EXT_MAP_PROCESS,
'user-account': EXT_MAP_USER_ACCOUNT,
}
def parse_observable(data, _valid_refs=[], allow_custom=False):
"""Deserialize a string or file-like object into a STIX Cyber Observable object.
Args:
data: The STIX 2 string to be parsed.
_valid_refs: A list of object references valid for the scope of the object being parsed.
allow_custom: Whether to allow custom properties or not. Default: False.
Returns:
An instantiated Python STIX Cyber Observable object.
"""
obj = get_dict(data)
obj['_valid_refs'] = _valid_refs
if 'type' not in obj:
raise ParseError("Can't parse object with no 'type' property: %s" % str(obj))
try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
raise ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
if name not in EXT_MAP[obj['type']]:
raise ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
ext_class = EXT_MAP[obj['type']][name]
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
return obj_class(allow_custom=allow_custom, **obj)
def _register_observable(new_observable):
"""Register a custom STIX Cyber Observable type.
"""
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable
def CustomObservable(type='x-custom-observable', properties={}):
"""Custom STIX Cyber Observable type decorator
"""
def custom_builder(cls):
@ -608,7 +756,7 @@ def CustomObservable(type='x-custom-observable', properties={}):
_Observable.__init__(self, **kwargs)
cls.__init__(self, **kwargs)
stix2._register_observable(_Custom)
_register_observable(_Custom)
return _Custom
return custom_builder

View File

@ -7,7 +7,7 @@ import uuid
from six import text_type
from .base import _Observable, _STIXBase
from .base import _STIXBase
from .exceptions import DictionaryKeyError
from .utils import get_dict, parse_into_datetime
@ -220,29 +220,6 @@ class TimestampProperty(Property):
return parse_into_datetime(value, self.precision)
class ObservableProperty(Property):
def clean(self, value):
try:
dictified = get_dict(value)
except ValueError:
raise ValueError("The observable property must contain a dictionary")
if dictified == {}:
raise ValueError("The dictionary property must contain a non-empty dictionary")
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
from .__init__ import parse_observable # avoid circular import
for key, obj in dictified.items():
parsed_obj = parse_observable(obj, valid_refs)
if not issubclass(type(parsed_obj), _Observable):
raise ValueError("Objects in an observable property must be "
"Cyber Observable Objects")
dictified[key] = parsed_obj
return dictified
class DictionaryProperty(Property):
def clean(self, value):
@ -393,35 +370,3 @@ class EnumProperty(StringProperty):
if value not in self.allowed:
raise ValueError("value '%s' is not valid for this enumeration." % value)
return self.string_type(value)
class ExtensionsProperty(DictionaryProperty):
def __init__(self, enclosing_type=None, required=False):
self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(required)
def clean(self, value):
try:
dictified = get_dict(value)
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
if dictified == {}:
raise ValueError("The dictionary property must contain a non-empty dictionary")
from .__init__ import EXT_MAP # avoid circular import
if self.enclosing_type in EXT_MAP:
specific_type_map = EXT_MAP[self.enclosing_type]
for key, subvalue in dictified.items():
if key in specific_type_map:
cls = specific_type_map[key]
if type(subvalue) is dict:
dictified[key] = cls(**subvalue)
elif type(subvalue) is cls:
dictified[key] = subvalue
else:
raise ValueError("Cannot determine extension type.")
else:
raise ValueError("The key used in the extensions dictionary is not an extension type name")
else:
raise ValueError("The enclosing type has no extensions defined")
return dictified

View File

@ -4,9 +4,10 @@ import stix2
from .base import _STIXBase
from .common import COMMON_PROPERTIES
from .observables import ObservableProperty
from .other import KillChainPhase
from .properties import (IDProperty, IntegerProperty, ListProperty,
ObservableProperty, ReferenceProperty, StringProperty,
ReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
from .utils import NOW

View File

@ -166,3 +166,15 @@ def test_observable_custom_property_allowed():
allow_custom=True,
)
assert no.x_foo == "bar"
def test_observed_data_with_custom_observable_object():
no = NewObservable(property1='something')
ob_data = stix2.ObservedData(
first_observed=stix2.utils.NOW,
last_observed=stix2.utils.NOW,
number_observed=1,
objects={'0': no},
allow_custom=True,
)
assert ob_data.objects['0'].property1 == 'something'

View File

@ -2,10 +2,10 @@ import pytest
from stix2 import TCPExt
from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError
from stix2.observables import EmailMIMEComponent
from stix2.observables import EmailMIMEComponent, ExtensionsProperty
from stix2.properties import (BinaryProperty, BooleanProperty,
DictionaryProperty, EmbeddedObjectProperty,
EnumProperty, ExtensionsProperty, HashesProperty,
EnumProperty, HashesProperty,
HexProperty, IDProperty, IntegerProperty,
ListProperty, Property, ReferenceProperty,
StringProperty, TimestampProperty, TypeProperty)