From 035f1a6c7354a7a60f9a82eb9792378a37fda346 Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Mon, 2 Oct 2017 12:28:47 -0400 Subject: [PATCH 1/4] Support MarkingDefinition objs, not just ID strs (when passed into `add_markings()` or `remove_markings()`) --- stix2/markings/granular_markings.py | 22 ++++---- stix2/markings/object_markings.py | 6 +-- stix2/markings/utils.py | 15 ++++++ stix2/test/test_granular_markings.py | 78 +++++++++++++++++----------- stix2/test/test_object_markings.py | 23 ++++++-- 5 files changed, 96 insertions(+), 48 deletions(-) diff --git a/stix2/markings/granular_markings.py b/stix2/markings/granular_markings.py index 7e9ccc7..5afd1cc 100644 --- a/stix2/markings/granular_markings.py +++ b/stix2/markings/granular_markings.py @@ -88,6 +88,7 @@ def remove_markings(obj, marking, selectors): """ selectors = utils.convert_to_list(selectors) + marking = utils.convert_to_marking_list(marking) utils.validate(obj, selectors) granular_markings = obj.get("granular_markings") @@ -97,12 +98,9 @@ def remove_markings(obj, marking, selectors): granular_markings = utils.expand_markings(granular_markings) - if isinstance(marking, list): - to_remove = [] - for m in marking: - to_remove.append({"marking_ref": m, "selectors": selectors}) - else: - to_remove = [{"marking_ref": marking, "selectors": selectors}] + to_remove = [] + for m in marking: + to_remove.append({"marking_ref": m, "selectors": selectors}) remove = utils.build_granular_marking(to_remove).get("granular_markings") @@ -140,14 +138,12 @@ def add_markings(obj, marking, selectors): """ selectors = utils.convert_to_list(selectors) + marking = utils.convert_to_marking_list(marking) utils.validate(obj, selectors) - if isinstance(marking, list): - granular_marking = [] - for m in marking: - granular_marking.append({"marking_ref": m, "selectors": sorted(selectors)}) - else: - granular_marking = [{"marking_ref": marking, "selectors": sorted(selectors)}] + granular_marking = [] + for m in marking: + granular_marking.append({"marking_ref": m, "selectors": sorted(selectors)}) if obj.get("granular_markings"): granular_marking.extend(obj.get("granular_markings")) @@ -244,7 +240,7 @@ def is_marked(obj, marking=None, selectors=None, inherited=False, descendants=Fa raise TypeError("Required argument 'selectors' must be provided") selectors = utils.convert_to_list(selectors) - marking = utils.convert_to_list(marking) + marking = utils.convert_to_marking_list(marking) utils.validate(obj, selectors) granular_markings = obj.get("granular_markings", []) diff --git a/stix2/markings/object_markings.py b/stix2/markings/object_markings.py index c39c036..a775ddc 100644 --- a/stix2/markings/object_markings.py +++ b/stix2/markings/object_markings.py @@ -31,7 +31,7 @@ def add_markings(obj, marking): A new version of the given SDO or SRO with specified markings added. """ - marking = utils.convert_to_list(marking) + marking = utils.convert_to_marking_list(marking) object_markings = set(obj.get("object_marking_refs", []) + marking) @@ -55,7 +55,7 @@ def remove_markings(obj, marking): A new version of the given SDO or SRO with specified markings removed. """ - marking = utils.convert_to_list(marking) + marking = utils.convert_to_marking_list(marking) object_markings = obj.get("object_marking_refs", []) @@ -121,7 +121,7 @@ def is_marked(obj, marking=None): provided marking refs match, True is returned. """ - marking = utils.convert_to_list(marking) + marking = utils.convert_to_marking_list(marking) object_markings = obj.get("object_marking_refs", []) if marking: diff --git a/stix2/markings/utils.py b/stix2/markings/utils.py index d0d38bb..1154d19 100644 --- a/stix2/markings/utils.py +++ b/stix2/markings/utils.py @@ -37,6 +37,12 @@ def _validate_selector(obj, selector): return True +def _get_marking_id(marking): + if type(marking).__name__ is 'MarkingDefinition': # avoid circular import + return marking.id + return marking + + def validate(obj, selectors): """Given an SDO or SRO, check that each selector is valid.""" if selectors: @@ -57,6 +63,15 @@ def convert_to_list(data): return [data] +def convert_to_marking_list(data): + """Convert input into a list of marking identifiers.""" + if data is not None: + if isinstance(data, list): + return [_get_marking_id(x) for x in data] + else: + return [_get_marking_id(data)] + + def compress_markings(granular_markings): """ Compress granular markings list. If there is more than one marking diff --git a/stix2/test/test_granular_markings.py b/stix2/test/test_granular_markings.py index e910ad3..f8fc803 100644 --- a/stix2/test/test_granular_markings.py +++ b/stix2/test/test_granular_markings.py @@ -1,7 +1,7 @@ import pytest -from stix2 import Malware, markings +from stix2 import TLP_RED, Malware, markings from .constants import MALWARE_MORE_KWARGS as MALWARE_KWARGS_CONST from .constants import MARKING_IDS @@ -45,6 +45,7 @@ def test_add_marking_mark_one_selector_multiple_refs(): }, ], **MALWARE_KWARGS), + MARKING_IDS[0], ), ( MALWARE_KWARGS, @@ -56,13 +57,26 @@ def test_add_marking_mark_one_selector_multiple_refs(): }, ], **MALWARE_KWARGS), + MARKING_IDS[0], + ), + ( + Malware(**MALWARE_KWARGS), + Malware( + granular_markings=[ + { + "selectors": ["description", "name"], + "marking_ref": TLP_RED.id, + }, + ], + **MALWARE_KWARGS), + TLP_RED, ), ]) def test_add_marking_mark_multiple_selector_one_refs(data): before = data[0] after = data[1] - before = markings.add_markings(before, [MARKING_IDS[0]], ["description", "name"]) + before = markings.add_markings(before, data[2], ["description", "name"]) for m in before["granular_markings"]: assert m in after["granular_markings"] @@ -347,36 +361,42 @@ def test_get_markings_positional_arguments_combinations(data): assert set(markings.get_markings(data, "x.z.foo2", False, True)) == set(["10"]) -@pytest.mark.parametrize("before", [ - Malware( - granular_markings=[ - { - "selectors": ["description"], - "marking_ref": MARKING_IDS[0] - }, - { - "selectors": ["description"], - "marking_ref": MARKING_IDS[1] - }, - ], - **MALWARE_KWARGS +@pytest.mark.parametrize("data", [ + ( + Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[1] + }, + ], + **MALWARE_KWARGS + ), + [MARKING_IDS[0], MARKING_IDS[1]], ), - dict( - granular_markings=[ - { - "selectors": ["description"], - "marking_ref": MARKING_IDS[0] - }, - { - "selectors": ["description"], - "marking_ref": MARKING_IDS[1] - }, - ], - **MALWARE_KWARGS + ( + dict( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[1] + }, + ], + **MALWARE_KWARGS + ), + [MARKING_IDS[0], MARKING_IDS[1]], ), ]) -def test_remove_marking_remove_one_selector_with_multiple_refs(before): - before = markings.remove_markings(before, [MARKING_IDS[0], MARKING_IDS[1]], ["description"]) +def test_remove_marking_remove_one_selector_with_multiple_refs(data): + before = markings.remove_markings(data[0], data[1], ["description"]) assert "granular_markings" not in before diff --git a/stix2/test/test_object_markings.py b/stix2/test/test_object_markings.py index 36e8e4d..10949ab 100644 --- a/stix2/test/test_object_markings.py +++ b/stix2/test/test_object_markings.py @@ -1,7 +1,7 @@ import pytest -from stix2 import Malware, exceptions, markings +from stix2 import TLP_AMBER, Malware, exceptions, markings from .constants import FAKE_TIME, MALWARE_ID, MARKING_IDS from .constants import MALWARE_KWARGS as MALWARE_KWARGS_CONST @@ -21,18 +21,26 @@ MALWARE_KWARGS.update({ Malware(**MALWARE_KWARGS), Malware(object_marking_refs=[MARKING_IDS[0]], **MALWARE_KWARGS), + MARKING_IDS[0], ), ( MALWARE_KWARGS, dict(object_marking_refs=[MARKING_IDS[0]], **MALWARE_KWARGS), + MARKING_IDS[0], + ), + ( + Malware(**MALWARE_KWARGS), + Malware(object_marking_refs=[TLP_AMBER.id], + **MALWARE_KWARGS), + TLP_AMBER, ), ]) def test_add_markings_one_marking(data): before = data[0] after = data[1] - before = markings.add_markings(before, MARKING_IDS[0], None) + before = markings.add_markings(before, data[2], None) for m in before["object_marking_refs"]: assert m in after["object_marking_refs"] @@ -280,19 +288,28 @@ def test_remove_markings_object_level(data): **MALWARE_KWARGS), Malware(object_marking_refs=[MARKING_IDS[1]], **MALWARE_KWARGS), + [MARKING_IDS[0], MARKING_IDS[2]], ), ( dict(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]], **MALWARE_KWARGS), dict(object_marking_refs=[MARKING_IDS[1]], **MALWARE_KWARGS), + [MARKING_IDS[0], MARKING_IDS[2]], + ), + ( + Malware(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], TLP_AMBER.id], + **MALWARE_KWARGS), + Malware(object_marking_refs=[MARKING_IDS[1]], + **MALWARE_KWARGS), + [MARKING_IDS[0], TLP_AMBER], ), ]) def test_remove_markings_multiple(data): before = data[0] after = data[1] - before = markings.remove_markings(before, [MARKING_IDS[0], MARKING_IDS[2]], None) + before = markings.remove_markings(before, data[2], None) assert before['object_marking_refs'] == after['object_marking_refs'] From 58db629de83806f79a5e40c2948648b96177db65 Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Mon, 2 Oct 2017 16:09:38 -0400 Subject: [PATCH 2/4] Add markings functions to top level namespace --- stix2/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stix2/__init__.py b/stix2/__init__.py index 7be0904..53c2fb1 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -8,6 +8,8 @@ from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking, MarkingDefinition, StatementMarking, TLPMarking) from .core import Bundle, _register_type, parse from .environment import Environment, ObjectFactory +from .markings import (add_markings, clear_markings, get_markings, is_marked, + remove_markings, set_markings) from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, CustomObservable, Directory, DomainName, EmailAddress, EmailMessage, From 2bdf83401ae78c35f778da190338454323b12424 Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Tue, 3 Oct 2017 10:28:58 -0400 Subject: [PATCH 3/4] Add a shortcut for marking functions So marking functions like `add_markings()` can be directly called as methods on SDO/SRO classes. Note that they return new versions of the objects because objects are immutable. --- stix2/common.py | 3 ++- stix2/markings/__init__.py | 13 +++++++++++++ stix2/sdo.py | 27 ++++++++++++++------------- stix2/sro.py | 5 +++-- stix2/test/test_markings.py | 12 +++++++++++- 5 files changed, 43 insertions(+), 17 deletions(-) diff --git a/stix2/common.py b/stix2/common.py index a2e6918..d7994c6 100644 --- a/stix2/common.py +++ b/stix2/common.py @@ -3,6 +3,7 @@ from collections import OrderedDict from .base import _STIXBase +from .markings import MarkingsMixin from .properties import (HashesProperty, IDProperty, ListProperty, Property, ReferenceProperty, SelectorProperty, StringProperty, TimestampProperty, TypeProperty) @@ -76,7 +77,7 @@ class MarkingProperty(Property): raise ValueError("must be a Statement, TLP Marking or a registered marking.") -class MarkingDefinition(_STIXBase): +class MarkingDefinition(_STIXBase, MarkingsMixin): _type = 'marking-definition' _properties = OrderedDict() _properties.update([ diff --git a/stix2/markings/__init__.py b/stix2/markings/__init__.py index 4f72e4c..41c761d 100644 --- a/stix2/markings/__init__.py +++ b/stix2/markings/__init__.py @@ -212,3 +212,16 @@ def is_marked(obj, marking=None, selectors=None, inherited=False, descendants=Fa result = result or object_markings.is_marked(obj, object_marks) return result + + +class MarkingsMixin(): + pass + + +# Note that all of these methods will return a new object because of immutability +MarkingsMixin.get_markings = get_markings +MarkingsMixin.set_markings = set_markings +MarkingsMixin.remove_markings = remove_markings +MarkingsMixin.add_markings = add_markings +MarkingsMixin.clear_markings = clear_markings +MarkingsMixin.is_marked = is_marked diff --git a/stix2/sdo.py b/stix2/sdo.py index 77c781a..ea4bd13 100644 --- a/stix2/sdo.py +++ b/stix2/sdo.py @@ -6,6 +6,7 @@ import stix2 from .base import _STIXBase from .common import ExternalReference, GranularMarking, KillChainPhase +from .markings import MarkingsMixin from .observables import ObservableProperty from .properties import (BooleanProperty, IDProperty, IntegerProperty, ListProperty, PatternProperty, ReferenceProperty, @@ -13,7 +14,7 @@ from .properties import (BooleanProperty, IDProperty, IntegerProperty, from .utils import NOW -class AttackPattern(_STIXBase): +class AttackPattern(_STIXBase, MarkingsMixin): _type = 'attack-pattern' _properties = OrderedDict() @@ -34,7 +35,7 @@ class AttackPattern(_STIXBase): ]) -class Campaign(_STIXBase): +class Campaign(_STIXBase, MarkingsMixin): _type = 'campaign' _properties = OrderedDict() @@ -58,7 +59,7 @@ class Campaign(_STIXBase): ]) -class CourseOfAction(_STIXBase): +class CourseOfAction(_STIXBase, MarkingsMixin): _type = 'course-of-action' _properties = OrderedDict() @@ -78,7 +79,7 @@ class CourseOfAction(_STIXBase): ]) -class Identity(_STIXBase): +class Identity(_STIXBase, MarkingsMixin): _type = 'identity' _properties = OrderedDict() @@ -101,7 +102,7 @@ class Identity(_STIXBase): ]) -class Indicator(_STIXBase): +class Indicator(_STIXBase, MarkingsMixin): _type = 'indicator' _properties = OrderedDict() @@ -125,7 +126,7 @@ class Indicator(_STIXBase): ]) -class IntrusionSet(_STIXBase): +class IntrusionSet(_STIXBase, MarkingsMixin): _type = 'intrusion-set' _properties = OrderedDict() @@ -152,7 +153,7 @@ class IntrusionSet(_STIXBase): ]) -class Malware(_STIXBase): +class Malware(_STIXBase, MarkingsMixin): _type = 'malware' _properties = OrderedDict() @@ -173,7 +174,7 @@ class Malware(_STIXBase): ]) -class ObservedData(_STIXBase): +class ObservedData(_STIXBase, MarkingsMixin): _type = 'observed-data' _properties = OrderedDict() @@ -195,7 +196,7 @@ class ObservedData(_STIXBase): ]) -class Report(_STIXBase): +class Report(_STIXBase, MarkingsMixin): _type = 'report' _properties = OrderedDict() @@ -217,7 +218,7 @@ class Report(_STIXBase): ]) -class ThreatActor(_STIXBase): +class ThreatActor(_STIXBase, MarkingsMixin): _type = 'threat-actor' _properties = OrderedDict() @@ -245,7 +246,7 @@ class ThreatActor(_STIXBase): ]) -class Tool(_STIXBase): +class Tool(_STIXBase, MarkingsMixin): _type = 'tool' _properties = OrderedDict() @@ -267,7 +268,7 @@ class Tool(_STIXBase): ]) -class Vulnerability(_STIXBase): +class Vulnerability(_STIXBase, MarkingsMixin): _type = 'vulnerability' _properties = OrderedDict() @@ -316,7 +317,7 @@ def CustomObject(type='x-custom-type', properties=None): def custom_builder(cls): - class _Custom(cls, _STIXBase): + class _Custom(cls, _STIXBase, MarkingsMixin): _type = type _properties = OrderedDict() _properties.update([ diff --git a/stix2/sro.py b/stix2/sro.py index af483bc..0a8048d 100644 --- a/stix2/sro.py +++ b/stix2/sro.py @@ -4,13 +4,14 @@ from collections import OrderedDict from .base import _STIXBase from .common import ExternalReference, GranularMarking +from .markings import MarkingsMixin from .properties import (BooleanProperty, IDProperty, IntegerProperty, ListProperty, ReferenceProperty, StringProperty, TimestampProperty, TypeProperty) from .utils import NOW -class Relationship(_STIXBase): +class Relationship(_STIXBase, MarkingsMixin): _type = 'relationship' _properties = OrderedDict() @@ -45,7 +46,7 @@ class Relationship(_STIXBase): super(Relationship, self).__init__(**kwargs) -class Sighting(_STIXBase): +class Sighting(_STIXBase, MarkingsMixin): _type = 'sighting' _properties = OrderedDict() _properties.update([ diff --git a/stix2/test/test_markings.py b/stix2/test/test_markings.py index 0c6069a..456bf92 100644 --- a/stix2/test/test_markings.py +++ b/stix2/test/test_markings.py @@ -241,4 +241,14 @@ def test_marking_wrong_type_construction(): assert str(excinfo.value) == "Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]" -# TODO: Add other examples +def test_campaign_add_markings(): + campaign = stix2.Campaign( + id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00Z", + modified="2016-04-06T20:03:00Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector.", + ) + campaign = campaign.add_markings(TLP_WHITE) + assert campaign.object_marking_refs[0] == TLP_WHITE.id From 3ca8cf820e7a63af65efa32980a6a16b3392c682 Mon Sep 17 00:00:00 2001 From: Chris Lenk Date: Tue, 3 Oct 2017 15:01:55 -0400 Subject: [PATCH 4/4] Add STIXDomainObject and STIXRelationshipObject --- stix2/sdo.py | 30 +++++++++++++++++------------- stix2/sro.py | 8 ++++++-- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/stix2/sdo.py b/stix2/sdo.py index ea4bd13..53f965d 100644 --- a/stix2/sdo.py +++ b/stix2/sdo.py @@ -14,7 +14,11 @@ from .properties import (BooleanProperty, IDProperty, IntegerProperty, from .utils import NOW -class AttackPattern(_STIXBase, MarkingsMixin): +class STIXDomainObject(_STIXBase, MarkingsMixin): + pass + + +class AttackPattern(STIXDomainObject): _type = 'attack-pattern' _properties = OrderedDict() @@ -35,7 +39,7 @@ class AttackPattern(_STIXBase, MarkingsMixin): ]) -class Campaign(_STIXBase, MarkingsMixin): +class Campaign(STIXDomainObject): _type = 'campaign' _properties = OrderedDict() @@ -59,7 +63,7 @@ class Campaign(_STIXBase, MarkingsMixin): ]) -class CourseOfAction(_STIXBase, MarkingsMixin): +class CourseOfAction(STIXDomainObject): _type = 'course-of-action' _properties = OrderedDict() @@ -79,7 +83,7 @@ class CourseOfAction(_STIXBase, MarkingsMixin): ]) -class Identity(_STIXBase, MarkingsMixin): +class Identity(STIXDomainObject): _type = 'identity' _properties = OrderedDict() @@ -102,7 +106,7 @@ class Identity(_STIXBase, MarkingsMixin): ]) -class Indicator(_STIXBase, MarkingsMixin): +class Indicator(STIXDomainObject): _type = 'indicator' _properties = OrderedDict() @@ -126,7 +130,7 @@ class Indicator(_STIXBase, MarkingsMixin): ]) -class IntrusionSet(_STIXBase, MarkingsMixin): +class IntrusionSet(STIXDomainObject): _type = 'intrusion-set' _properties = OrderedDict() @@ -153,7 +157,7 @@ class IntrusionSet(_STIXBase, MarkingsMixin): ]) -class Malware(_STIXBase, MarkingsMixin): +class Malware(STIXDomainObject): _type = 'malware' _properties = OrderedDict() @@ -174,7 +178,7 @@ class Malware(_STIXBase, MarkingsMixin): ]) -class ObservedData(_STIXBase, MarkingsMixin): +class ObservedData(STIXDomainObject): _type = 'observed-data' _properties = OrderedDict() @@ -196,7 +200,7 @@ class ObservedData(_STIXBase, MarkingsMixin): ]) -class Report(_STIXBase, MarkingsMixin): +class Report(STIXDomainObject): _type = 'report' _properties = OrderedDict() @@ -218,7 +222,7 @@ class Report(_STIXBase, MarkingsMixin): ]) -class ThreatActor(_STIXBase, MarkingsMixin): +class ThreatActor(STIXDomainObject): _type = 'threat-actor' _properties = OrderedDict() @@ -246,7 +250,7 @@ class ThreatActor(_STIXBase, MarkingsMixin): ]) -class Tool(_STIXBase, MarkingsMixin): +class Tool(STIXDomainObject): _type = 'tool' _properties = OrderedDict() @@ -268,7 +272,7 @@ class Tool(_STIXBase, MarkingsMixin): ]) -class Vulnerability(_STIXBase, MarkingsMixin): +class Vulnerability(STIXDomainObject): _type = 'vulnerability' _properties = OrderedDict() @@ -317,7 +321,7 @@ def CustomObject(type='x-custom-type', properties=None): def custom_builder(cls): - class _Custom(cls, _STIXBase, MarkingsMixin): + class _Custom(cls, STIXDomainObject): _type = type _properties = OrderedDict() _properties.update([ diff --git a/stix2/sro.py b/stix2/sro.py index 0a8048d..4fa0465 100644 --- a/stix2/sro.py +++ b/stix2/sro.py @@ -11,7 +11,11 @@ from .properties import (BooleanProperty, IDProperty, IntegerProperty, from .utils import NOW -class Relationship(_STIXBase, MarkingsMixin): +class STIXRelationshipObject(_STIXBase, MarkingsMixin): + pass + + +class Relationship(STIXRelationshipObject): _type = 'relationship' _properties = OrderedDict() @@ -46,7 +50,7 @@ class Relationship(_STIXBase, MarkingsMixin): super(Relationship, self).__init__(**kwargs) -class Sighting(_STIXBase, MarkingsMixin): +class Sighting(STIXRelationshipObject): _type = 'sighting' _properties = OrderedDict() _properties.update([