From ee8013d782a456a26f07578631b77552dd1d1fe6 Mon Sep 17 00:00:00 2001 From: clenk Date: Fri, 11 Aug 2017 16:18:20 -0400 Subject: [PATCH] Parse bundles correctly This required refactoring parts of the library. Code in __init__.py merged into bundle.py, which was renamed core.py. Code in other.py was merged into common.py. Fixes #40. --- stix2/__init__.py | 56 ++--------------- stix2/bundle.py | 25 -------- stix2/common.py | 129 ++++++++++++++++++++++++++++++++++++-- stix2/core.py | 99 +++++++++++++++++++++++++++++ stix2/other.py | 128 ------------------------------------- stix2/properties.py | 3 + stix2/sdo.py | 3 +- stix2/test/test_bundle.py | 12 ++++ 8 files changed, 244 insertions(+), 211 deletions(-) delete mode 100644 stix2/bundle.py create mode 100644 stix2/core.py delete mode 100644 stix2/other.py diff --git a/stix2/__init__.py b/stix2/__init__.py index 98697a9..b9b6764 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -3,7 +3,10 @@ # flake8: noqa from . import exceptions -from .bundle import Bundle +from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, + ExternalReference, GranularMarking, KillChainPhase, + MarkingDefinition, StatementMarking, TLPMarking) +from .core import Bundle, _register_type, parse from .environment import ObjectFactory from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, CustomObservable, Directory, @@ -18,9 +21,6 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, WindowsRegistryValueType, WindowsServiceExt, X509Certificate, X509V3ExtenstionsType, parse_observable) -from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, - ExternalReference, GranularMarking, KillChainPhase, - MarkingDefinition, StatementMarking, TLPMarking) from .patterns import (AndBooleanExpression, AndObservationExpression, BasicObjectPathComponent, EqualityComparisonExpression, FloatConstant, FollowedByObservationExpression, @@ -44,51 +44,3 @@ from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, from .sro import Relationship, Sighting from .utils import get_dict from .version import __version__ - -OBJ_MAP = { - 'attack-pattern': AttackPattern, - 'campaign': Campaign, - 'course-of-action': CourseOfAction, - 'identity': Identity, - 'indicator': Indicator, - 'intrusion-set': IntrusionSet, - 'malware': Malware, - 'marking-definition': MarkingDefinition, - 'observed-data': ObservedData, - 'report': Report, - 'relationship': Relationship, - 'threat-actor': ThreatActor, - 'tool': Tool, - 'sighting': Sighting, - 'vulnerability': Vulnerability, -} - - -def parse(data, allow_custom=False): - """Deserialize a string or file-like object into a STIX object. - - Args: - data: The STIX 2 string to be parsed. - allow_custom (bool): Whether to allow custom properties or not. Default: False. - - Returns: - An instantiated Python STIX object. - """ - - obj = get_dict(data) - - if 'type' not in obj: - raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj)) - - try: - obj_class = OBJ_MAP[obj['type']] - except KeyError: - raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj['type']) - return obj_class(allow_custom=allow_custom, **obj) - - -def _register_type(new_type): - """Register a custom STIX Object type. - """ - - OBJ_MAP[new_type._type] = new_type diff --git a/stix2/bundle.py b/stix2/bundle.py deleted file mode 100644 index b598ceb..0000000 --- a/stix2/bundle.py +++ /dev/null @@ -1,25 +0,0 @@ -"""STIX 2 Bundle object""" - -from .base import _STIXBase -from .properties import IDProperty, Property, TypeProperty - - -class Bundle(_STIXBase): - - _type = 'bundle' - _properties = { - 'type': TypeProperty(_type), - 'id': IDProperty(_type), - 'spec_version': Property(fixed="2.0"), - 'objects': Property(), - } - - def __init__(self, *args, **kwargs): - # Add any positional arguments to the 'objects' kwarg. - if args: - if isinstance(args[0], list): - kwargs['objects'] = args[0] + list(args[1:]) + kwargs.get('objects', []) - else: - kwargs['objects'] = list(args) + kwargs.get('objects', []) - - super(Bundle, self).__init__(**kwargs) diff --git a/stix2/common.py b/stix2/common.py index 7c6e747..57f99d0 100644 --- a/stix2/common.py +++ b/stix2/common.py @@ -1,10 +1,131 @@ """STIX 2 Common Data Types and Properties""" -from .other import ExternalReference, GranularMarking -from .properties import (BooleanProperty, ListProperty, ReferenceProperty, - StringProperty, TimestampProperty) -from .utils import NOW +from .base import _STIXBase +from .properties import (BooleanProperty, IDProperty, ListProperty, Property, + ReferenceProperty, SelectorProperty, StringProperty, + TimestampProperty, TypeProperty) +from .utils import NOW, get_dict + +class ExternalReference(_STIXBase): + _properties = { + 'source_name': StringProperty(required=True), + 'description': StringProperty(), + 'url': StringProperty(), + 'external_id': StringProperty(), + } + + def _check_object_constraints(self): + super(ExternalReference, self)._check_object_constraints() + self._check_at_least_one_property(["description", "external_id", "url"]) + + +class KillChainPhase(_STIXBase): + _properties = { + 'kill_chain_name': StringProperty(required=True), + 'phase_name': StringProperty(required=True), + } + + +class GranularMarking(_STIXBase): + _properties = { + 'marking_ref': ReferenceProperty(required=True, type="marking-definition"), + 'selectors': ListProperty(SelectorProperty, required=True), + } + + +class TLPMarking(_STIXBase): + # TODO: don't allow the creation of any other TLPMarkings than the ones below + _properties = { + 'tlp': Property(required=True) + } + + +class StatementMarking(_STIXBase): + _properties = { + 'statement': StringProperty(required=True) + } + + def __init__(self, statement=None, **kwargs): + # Allow statement as positional args. + if statement and not kwargs.get('statement'): + kwargs['statement'] = statement + + super(StatementMarking, self).__init__(**kwargs) + + +class MarkingProperty(Property): + """Represent the marking objects in the `definition` property of + marking-definition objects. + """ + + def clean(self, value): + if type(value) in [TLPMarking, StatementMarking]: + return value + else: + raise ValueError("must be a Statement or TLP Marking.") + + +class MarkingDefinition(_STIXBase): + _type = 'marking-definition' + _properties = { + 'created': TimestampProperty(default=lambda: NOW), + 'external_references': ListProperty(ExternalReference), + 'created_by_ref': ReferenceProperty(type="identity"), + 'object_marking_refs': ListProperty(ReferenceProperty(type="marking-definition")), + 'granular_markings': ListProperty(GranularMarking), + 'type': TypeProperty(_type), + 'id': IDProperty(_type), + 'definition_type': StringProperty(required=True), + 'definition': MarkingProperty(required=True), + } + marking_map = { + 'tlp': TLPMarking, + 'statement': StatementMarking, + } + + def __init__(self, **kwargs): + if set(('definition_type', 'definition')).issubset(kwargs.keys()): + # Create correct marking type object + try: + marking_type = self.marking_map[kwargs['definition_type']] + except KeyError: + raise ValueError("definition_type must be a valid marking type") + + if not isinstance(kwargs['definition'], marking_type): + defn = get_dict(kwargs['definition']) + kwargs['definition'] = marking_type(**defn) + + super(MarkingDefinition, self).__init__(**kwargs) + + +TLP_WHITE = MarkingDefinition( + id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", + created="2017-01-20T00:00:00.000Z", + definition_type="tlp", + definition=TLPMarking(tlp="white") +) + +TLP_GREEN = MarkingDefinition( + id="marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da", + created="2017-01-20T00:00:00.000Z", + definition_type="tlp", + definition=TLPMarking(tlp="green") +) + +TLP_AMBER = MarkingDefinition( + id="marking-definition--f88d31f6-486f-44da-b317-01333bde0b82", + created="2017-01-20T00:00:00.000Z", + definition_type="tlp", + definition=TLPMarking(tlp="amber") +) + +TLP_RED = MarkingDefinition( + id="marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", + created="2017-01-20T00:00:00.000Z", + definition_type="tlp", + definition=TLPMarking(tlp="red") +) COMMON_PROPERTIES = { # 'type' and 'id' should be defined on each individual type 'created': TimestampProperty(default=lambda: NOW, precision='millisecond'), diff --git a/stix2/core.py b/stix2/core.py new file mode 100644 index 0000000..81dd492 --- /dev/null +++ b/stix2/core.py @@ -0,0 +1,99 @@ +"""STIX 2.0 Objects that are neither SDOs nor SROs""" + + +from . import exceptions +from .base import _STIXBase +from .common import MarkingDefinition +from .properties import IDProperty, ListProperty, Property, TypeProperty +from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator, + IntrusionSet, Malware, ObservedData, Report, ThreatActor, + Tool, Vulnerability) +from .sro import Relationship, Sighting +from .utils import get_dict + + +class STIXObjectProperty(Property): + + def clean(self, value): + try: + dictified = get_dict(value) + except ValueError: + raise ValueError("This property may only contain a dictionary or object") + if dictified == {}: + raise ValueError("This property may only contain a non-empty dictionary or object") + if 'type' in dictified and dictified['type'] == 'bundle': + raise ValueError('This property may not contain a Bundle object') + + parsed_obj = parse(dictified) + return parsed_obj + + +class Bundle(_STIXBase): + + _type = 'bundle' + _properties = { + 'type': TypeProperty(_type), + 'id': IDProperty(_type), + 'spec_version': Property(fixed="2.0"), + 'objects': ListProperty(STIXObjectProperty), + } + + def __init__(self, *args, **kwargs): + # Add any positional arguments to the 'objects' kwarg. + if args: + if isinstance(args[0], list): + kwargs['objects'] = args[0] + list(args[1:]) + kwargs.get('objects', []) + else: + kwargs['objects'] = list(args) + kwargs.get('objects', []) + + super(Bundle, self).__init__(**kwargs) + + +OBJ_MAP = { + 'attack-pattern': AttackPattern, + 'bundle': Bundle, + 'campaign': Campaign, + 'course-of-action': CourseOfAction, + 'identity': Identity, + 'indicator': Indicator, + 'intrusion-set': IntrusionSet, + 'malware': Malware, + 'marking-definition': MarkingDefinition, + 'observed-data': ObservedData, + 'report': Report, + 'relationship': Relationship, + 'threat-actor': ThreatActor, + 'tool': Tool, + 'sighting': Sighting, + 'vulnerability': Vulnerability, +} + + +def parse(data, allow_custom=False): + """Deserialize a string or file-like object into a STIX object. + + Args: + data: The STIX 2 string to be parsed. + allow_custom (bool): Whether to allow custom properties or not. Default: False. + + Returns: + An instantiated Python STIX object. + """ + + obj = get_dict(data) + + if 'type' not in obj: + raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj)) + + try: + obj_class = OBJ_MAP[obj['type']] + except KeyError: + raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj['type']) + return obj_class(allow_custom=allow_custom, **obj) + + +def _register_type(new_type): + """Register a custom STIX Object type. + """ + + OBJ_MAP[new_type._type] = new_type diff --git a/stix2/other.py b/stix2/other.py deleted file mode 100644 index cd75745..0000000 --- a/stix2/other.py +++ /dev/null @@ -1,128 +0,0 @@ -"""STIX 2.0 Objects that are neither SDOs nor SROs""" - -from .base import _STIXBase -from .properties import (IDProperty, ListProperty, Property, ReferenceProperty, - SelectorProperty, StringProperty, TimestampProperty, - TypeProperty) -from .utils import NOW, get_dict - - -class ExternalReference(_STIXBase): - _properties = { - 'source_name': StringProperty(required=True), - 'description': StringProperty(), - 'url': StringProperty(), - 'external_id': StringProperty(), - } - - def _check_object_constraints(self): - super(ExternalReference, self)._check_object_constraints() - self._check_at_least_one_property(["description", "external_id", "url"]) - - -class KillChainPhase(_STIXBase): - _properties = { - 'kill_chain_name': StringProperty(required=True), - 'phase_name': StringProperty(required=True), - } - - -class GranularMarking(_STIXBase): - _properties = { - 'marking_ref': ReferenceProperty(required=True, type="marking-definition"), - 'selectors': ListProperty(SelectorProperty, required=True), - } - - -class TLPMarking(_STIXBase): - # TODO: don't allow the creation of any other TLPMarkings than the ones below - _properties = { - 'tlp': Property(required=True) - } - - -class StatementMarking(_STIXBase): - _properties = { - 'statement': StringProperty(required=True) - } - - def __init__(self, statement=None, **kwargs): - # Allow statement as positional args. - if statement and not kwargs.get('statement'): - kwargs['statement'] = statement - - super(StatementMarking, self).__init__(**kwargs) - - -class MarkingProperty(Property): - """Represent the marking objects in the `definition` property of - marking-definition objects. - """ - - def clean(self, value): - if type(value) in [TLPMarking, StatementMarking]: - return value - else: - raise ValueError("must be a Statement or TLP Marking.") - - -class MarkingDefinition(_STIXBase): - _type = 'marking-definition' - _properties = { - 'created': TimestampProperty(default=lambda: NOW), - 'external_references': ListProperty(ExternalReference), - 'created_by_ref': ReferenceProperty(type="identity"), - 'object_marking_refs': ListProperty(ReferenceProperty(type="marking-definition")), - 'granular_markings': ListProperty(GranularMarking), - 'type': TypeProperty(_type), - 'id': IDProperty(_type), - 'definition_type': StringProperty(required=True), - 'definition': MarkingProperty(required=True), - } - marking_map = { - 'tlp': TLPMarking, - 'statement': StatementMarking, - } - - def __init__(self, **kwargs): - if set(('definition_type', 'definition')).issubset(kwargs.keys()): - # Create correct marking type object - try: - marking_type = self.marking_map[kwargs['definition_type']] - except KeyError: - raise ValueError("definition_type must be a valid marking type") - - if not isinstance(kwargs['definition'], marking_type): - defn = get_dict(kwargs['definition']) - kwargs['definition'] = marking_type(**defn) - - super(MarkingDefinition, self).__init__(**kwargs) - - -TLP_WHITE = MarkingDefinition( - id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", - created="2017-01-20T00:00:00.000Z", - definition_type="tlp", - definition=TLPMarking(tlp="white") -) - -TLP_GREEN = MarkingDefinition( - id="marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da", - created="2017-01-20T00:00:00.000Z", - definition_type="tlp", - definition=TLPMarking(tlp="green") -) - -TLP_AMBER = MarkingDefinition( - id="marking-definition--f88d31f6-486f-44da-b317-01333bde0b82", - created="2017-01-20T00:00:00.000Z", - definition_type="tlp", - definition=TLPMarking(tlp="amber") -) - -TLP_RED = MarkingDefinition( - id="marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", - created="2017-01-20T00:00:00.000Z", - definition_type="tlp", - definition=TLPMarking(tlp="red") -) diff --git a/stix2/properties.py b/stix2/properties.py index db06763..f63ec8b 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -118,6 +118,9 @@ class ListProperty(Property): if type(self.contained) is EmbeddedObjectProperty: obj_type = self.contained.type + elif type(self.contained).__name__ is 'STIXObjectProperty': + # ^ this way of checking doesn't require a circular import + obj_type = type(valid) else: obj_type = self.contained diff --git a/stix2/sdo.py b/stix2/sdo.py index 8115b9d..43c8328 100644 --- a/stix2/sdo.py +++ b/stix2/sdo.py @@ -3,9 +3,8 @@ import stix2 from .base import _STIXBase -from .common import COMMON_PROPERTIES +from .common import COMMON_PROPERTIES, KillChainPhase from .observables import ObservableProperty -from .other import KillChainPhase from .properties import (IDProperty, IntegerProperty, ListProperty, ReferenceProperty, StringProperty, TimestampProperty, TypeProperty) diff --git a/stix2/test/test_bundle.py b/stix2/test/test_bundle.py index 54d7080..0733637 100644 --- a/stix2/test/test_bundle.py +++ b/stix2/test/test_bundle.py @@ -116,3 +116,15 @@ def test_create_bundle_with_arg_listarg_and_kwarg(indicator, malware, relationsh bundle = stix2.Bundle([indicator], malware, objects=[relationship]) assert str(bundle) == EXPECTED_BUNDLE + + +def test_parse_bundle(): + bundle = stix2.parse(EXPECTED_BUNDLE) + + assert bundle.type == "bundle" + assert bundle.id.startswith("bundle--") + assert bundle.spec_version == "2.0" + assert type(bundle.objects[0]) is stix2.Indicator + assert bundle.objects[0].type == 'indicator' + assert bundle.objects[1].type == 'malware' + assert bundle.objects[2].type == 'relationship'