diff --git a/.travis.yml b/.travis.yml index fdbb686..aba764d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,6 @@ sudo: false language: python cache: pip python: - - "2.6" - "2.7" - "3.3" - "3.4" diff --git a/setup.py b/setup.py index 3687e57..e359147 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,6 @@ setup( keywords="stix stix2 json cti cyber threat intelligence", packages=find_packages(), install_requires=[ - 'ordereddict ; python_version<"2.7"', 'python-dateutil', 'pytz', 'requests', diff --git a/stix2/__init__.py b/stix2/__init__.py index 6e89531..c2aae2e 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -42,5 +42,5 @@ from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, Identity, Indicator, IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, Vulnerability) from .sro import Relationship, Sighting -from .utils import get_dict +from .utils import get_dict, new_version, revoke from .version import __version__ diff --git a/stix2/base.py b/stix2/base.py index 2e7f026..5608102 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -10,10 +10,11 @@ from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError, ExtraPropertiesError, ImmutableError, InvalidObjRefError, InvalidValueError, MissingPropertiesError, - MutuallyExclusivePropertiesError, RevokeError, - UnmodifiablePropertyError) -from .utils import (NOW, find_property_index, format_datetime, get_timestamp, - parse_into_datetime) + MutuallyExclusivePropertiesError) +from .markings.utils import validate +from .utils import NOW, find_property_index, format_datetime, get_timestamp +from .utils import new_version as _new_version +from .utils import revoke as _revoke __all__ = ['STIXJSONEncoder', '_STIXBase'] @@ -85,8 +86,7 @@ class _STIXBase(collections.Mapping): def _check_object_constraints(self): for m in self.get("granular_markings", []): - # TODO: check selectors - pass + validate(self, m.get("selectors")) def __init__(self, allow_custom=False, **kwargs): cls = self.__class__ @@ -173,30 +173,10 @@ class _STIXBase(collections.Mapping): # Versioning API def new_version(self, **kwargs): - unchangable_properties = [] - if self.get("revoked"): - raise RevokeError("new_version") - new_obj_inner = copy.deepcopy(self._inner) - properties_to_change = kwargs.keys() - for prop in ["created", "created_by_ref", "id", "type"]: - if prop in properties_to_change: - unchangable_properties.append(prop) - if unchangable_properties: - raise UnmodifiablePropertyError(unchangable_properties) - cls = type(self) - if 'modified' not in kwargs: - kwargs['modified'] = get_timestamp() - else: - new_modified_property = parse_into_datetime(kwargs['modified'], precision='millisecond') - if new_modified_property < self.modified: - raise InvalidValueError(cls, 'modified', "The new modified datetime cannot be before the current modified datatime.") - new_obj_inner.update(kwargs) - return cls(**new_obj_inner) + return _new_version(self, **kwargs) def revoke(self): - if self.get("revoked"): - raise RevokeError("revoke") - return self.new_version(revoked=True) + return _revoke(self) class _Observable(_STIXBase): diff --git a/stix2/common.py b/stix2/common.py index 6242989..a2e6918 100644 --- a/stix2/common.py +++ b/stix2/common.py @@ -1,9 +1,6 @@ """STIX 2 Common Data Types and Properties""" -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict +from collections import OrderedDict from .base import _STIXBase from .properties import (HashesProperty, IDProperty, ListProperty, Property, diff --git a/stix2/core.py b/stix2/core.py index 0d0d1d2..be2a53d 100644 --- a/stix2/core.py +++ b/stix2/core.py @@ -1,9 +1,6 @@ """STIX 2.0 Objects that are neither SDOs nor SROs""" -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict +from collections import OrderedDict from . import exceptions from .base import _STIXBase diff --git a/stix2/exceptions.py b/stix2/exceptions.py index 5a9e7b2..32db472 100644 --- a/stix2/exceptions.py +++ b/stix2/exceptions.py @@ -157,3 +157,29 @@ class ParseError(STIXError, ValueError): def __init__(self, msg): super(ParseError, self).__init__(msg) + + +class InvalidSelectorError(STIXError, AssertionError): + """Granular Marking selector violation. The selector must resolve into an existing STIX object property.""" + + def __init__(self, cls, key): + super(InvalidSelectorError, self).__init__() + self.cls = cls + self.key = key + + def __str__(self): + msg = "Selector {0} in {1} is not valid!" + return msg.format(self.key, self.cls.__class__.__name__) + + +class MarkingNotFoundError(STIXError, AssertionError): + """Marking violation. The marking reference must be present in SDO or SRO.""" + + def __init__(self, cls, key): + super(MarkingNotFoundError, self).__init__() + self.cls = cls + self.key = key + + def __str__(self): + msg = "Marking {0} was not found in {1}!" + return msg.format(self.key, self.cls.__class__.__name__) diff --git a/stix2/markings/__init__.py b/stix2/markings/__init__.py new file mode 100644 index 0000000..4f72e4c --- /dev/null +++ b/stix2/markings/__init__.py @@ -0,0 +1,214 @@ +""" +Python STIX 2.0 Data Markings API. + +These high level functions will operate on both object level markings and +granular markings unless otherwise noted in each of the functions. +""" + +from stix2.markings import granular_markings, object_markings + + +def get_markings(obj, selectors=None, inherited=False, descendants=False): + """ + Get all markings associated to the field(s). + + Args: + obj: An SDO or SRO object. + selectors: string or list of selectors strings relative to the SDO or + SRO in which the properties appear. + inherited: If True, include object level markings and granular markings + inherited relative to the properties. + descendants: If True, include granular markings applied to any children + relative to the properties. + + Returns: + list: Marking identifiers that matched the selectors expression. + + Note: + If ``selectors`` is None, operation will be performed only on object + level markings. + + """ + if selectors is None: + return object_markings.get_markings(obj) + + results = granular_markings.get_markings( + obj, + selectors, + inherited, + descendants + ) + + if inherited: + results.extend(object_markings.get_markings(obj)) + + return list(set(results)) + + +def set_markings(obj, marking, selectors=None): + """ + Removes all markings associated with selectors and appends a new granular + marking. Refer to `clear_markings` and `add_markings` for details. + + Args: + obj: An SDO or SRO object. + selectors: string or list of selectors strings relative to the SDO or + SRO in which the properties appear. + marking: identifier or list of marking identifiers that apply to the + properties selected by `selectors`. + + Returns: + A new version of the given SDO or SRO with specified markings removed + and new ones added. + + Note: + If ``selectors`` is None, operations will be performed on object level + markings. Otherwise on granular markings. + + """ + if selectors is None: + return object_markings.set_markings(obj, marking) + else: + return granular_markings.set_markings(obj, marking, selectors) + + +def remove_markings(obj, marking, selectors=None): + """ + Removes granular_marking from the granular_markings collection. + + Args: + obj: An SDO or SRO object. + selectors: string or list of selectors strings relative to the SDO or + SRO in which the properties appear. + marking: identifier or list of marking identifiers that apply to the + properties selected by `selectors`. + + Raises: + InvalidSelectorError: If `selectors` fail validation. + MarkingNotFoundError: If markings to remove are not found on + the provided SDO or SRO. + + Returns: + A new version of the given SDO or SRO with specified markings removed. + + Note: + If ``selectors`` is None, operations will be performed on object level + markings. Otherwise on granular markings. + + """ + if selectors is None: + return object_markings.remove_markings(obj, marking) + else: + return granular_markings.remove_markings(obj, marking, selectors) + + +def add_markings(obj, marking, selectors=None): + """ + Appends a granular_marking to the granular_markings collection. + + Args: + obj: An SDO or SRO object. + selectors: string or list of selectors strings relative to the SDO or + SRO in which the properties appear. + marking: identifier or list of marking identifiers that apply to the + properties selected by `selectors`. + + Raises: + InvalidSelectorError: If `selectors` fail validation. + + Returns: + A new version of the given SDO or SRO with specified markings added. + + Note: + If ``selectors`` is None, operations will be performed on object level + markings. Otherwise on granular markings. + + """ + if selectors is None: + return object_markings.add_markings(obj, marking) + else: + return granular_markings.add_markings(obj, marking, selectors) + + +def clear_markings(obj, selectors=None): + """ + Removes all granular_marking associated with the selectors. + + Args: + obj: An SDO or SRO object. + selectors: string or list of selectors strings relative to the SDO or + SRO in which the field(s) appear(s). + + Raises: + InvalidSelectorError: If `selectors` fail validation. + MarkingNotFoundError: If markings to remove are not found on + the provided SDO or SRO. + + Returns: + A new version of the given SDO or SRO with specified markings cleared. + + Note: + If ``selectors`` is None, operations will be performed on object level + markings. Otherwise on granular markings. + + """ + if selectors is None: + return object_markings.clear_markings(obj) + else: + return granular_markings.clear_markings(obj, selectors) + + +def is_marked(obj, marking=None, selectors=None, inherited=False, descendants=False): + """ + Checks if field(s) is marked by any marking or by specific marking(s). + + Args: + obj: An SDO or SRO object. + selectors: string or list of selectors strings relative to the SDO or + SRO in which the field(s) appear(s). + marking: identifier or list of marking identifiers that apply to the + properties selected by `selectors`. + inherited: If True, include object level markings and granular markings + inherited to determine if the properties is/are marked. + descendants: If True, include granular markings applied to any children + of the given selector to determine if the properties is/are marked. + + Returns: + bool: True if ``selectors`` is found on internal SDO or SRO collection. + False otherwise. + + Note: + When a list of marking identifiers is provided, if ANY of the provided + marking identifiers match, True is returned. + + If ``selectors`` is None, operation will be performed only on object + level markings. + + """ + if selectors is None: + return object_markings.is_marked(obj, marking) + + result = granular_markings.is_marked( + obj, + marking, + selectors, + inherited, + descendants + ) + + if inherited: + granular_marks = granular_markings.get_markings(obj, selectors) + object_marks = object_markings.get_markings(obj) + + if granular_marks: + result = granular_markings.is_marked( + obj, + granular_marks, + selectors, + inherited, + descendants + ) + + result = result or object_markings.is_marked(obj, object_marks) + + return result diff --git a/stix2/markings/granular_markings.py b/stix2/markings/granular_markings.py new file mode 100644 index 0000000..7e9ccc7 --- /dev/null +++ b/stix2/markings/granular_markings.py @@ -0,0 +1,273 @@ + +from stix2 import exceptions +from stix2.markings import utils +from stix2.utils import new_version + + +def get_markings(obj, selectors, inherited=False, descendants=False): + """ + Get all markings associated to with the properties. + + Args: + obj: An SDO or SRO object. + selectors: string or list of selector strings relative to the SDO or + SRO in which the properties appear. + inherited: If True, include markings inherited relative to the + properties. + descendants: If True, include granular markings applied to any children + relative to the properties. + + Raises: + InvalidSelectorError: If `selectors` fail validation. + + Returns: + list: Marking identifiers that matched the selectors expression. + + """ + selectors = utils.convert_to_list(selectors) + utils.validate(obj, selectors) + + granular_markings = obj.get("granular_markings", []) + + if not granular_markings: + return [] + + results = set() + + for marking in granular_markings: + for user_selector in selectors: + for marking_selector in marking.get("selectors", []): + if any([(user_selector == marking_selector), # Catch explicit selectors. + (user_selector.startswith(marking_selector) and inherited), # Catch inherited selectors. + (marking_selector.startswith(user_selector) and descendants)]): # Catch descendants selectors + refs = marking.get("marking_ref", []) + results.update([refs]) + + return list(results) + + +def set_markings(obj, marking, selectors): + """ + Removes all markings associated with selectors and appends a new granular + marking. Refer to `clear_markings` and `add_markings` for details. + + Args: + obj: An SDO or SRO object. + selectors: string or list of selector strings relative to the SDO or + SRO in which the properties appear. + marking: identifier or list of marking identifiers that apply to the + properties selected by `selectors`. + + Returns: + A new version of the given SDO or SRO with specified markings removed + and new ones added. + + """ + obj = clear_markings(obj, selectors) + return add_markings(obj, marking, selectors) + + +def remove_markings(obj, marking, selectors): + """ + Removes granular_marking from the granular_markings collection. + + Args: + obj: An SDO or SRO object. + selectors: string or list of selectors strings relative to the SDO or + SRO in which the properties appear. + marking: identifier or list of marking identifiers that apply to the + properties selected by `selectors`. + + Raises: + InvalidSelectorError: If `selectors` fail validation. + MarkingNotFoundError: If markings to remove are not found on + the provided SDO or SRO. + + Returns: + A new version of the given SDO or SRO with specified markings removed. + + """ + selectors = utils.convert_to_list(selectors) + utils.validate(obj, selectors) + + granular_markings = obj.get("granular_markings") + + if not granular_markings: + return obj + + granular_markings = utils.expand_markings(granular_markings) + + if isinstance(marking, list): + to_remove = [] + for m in marking: + to_remove.append({"marking_ref": m, "selectors": selectors}) + else: + to_remove = [{"marking_ref": marking, "selectors": selectors}] + + remove = utils.build_granular_marking(to_remove).get("granular_markings") + + if not any(marking in granular_markings for marking in remove): + raise exceptions.MarkingNotFoundError(obj, remove) + + granular_markings = [ + m for m in granular_markings if m not in remove + ] + + granular_markings = utils.compress_markings(granular_markings) + + if granular_markings: + return new_version(obj, granular_markings=granular_markings) + else: + return new_version(obj, granular_markings=None) + + +def add_markings(obj, marking, selectors): + """ + Appends a granular_marking to the granular_markings collection. + + Args: + obj: An SDO or SRO object. + selectors: list of type string, selectors must be relative to the TLO + in which the properties appear. + marking: identifier or list of marking identifiers that apply to the + properties selected by `selectors`. + + Raises: + InvalidSelectorError: If `selectors` fail validation. + + Returns: + A new version of the given SDO or SRO with specified markings added. + + """ + selectors = utils.convert_to_list(selectors) + utils.validate(obj, selectors) + + if isinstance(marking, list): + granular_marking = [] + for m in marking: + granular_marking.append({"marking_ref": m, "selectors": sorted(selectors)}) + else: + granular_marking = [{"marking_ref": marking, "selectors": sorted(selectors)}] + + if obj.get("granular_markings"): + granular_marking.extend(obj.get("granular_markings")) + + granular_marking = utils.expand_markings(granular_marking) + granular_marking = utils.compress_markings(granular_marking) + return new_version(obj, granular_markings=granular_marking) + + +def clear_markings(obj, selectors): + """ + Removes all granular_markings associated with the selectors. + + Args: + obj: An SDO or SRO object. + selectors: string or list of selectors strings relative to the SDO or + SRO in which the properties appear. + + Raises: + InvalidSelectorError: If `selectors` fail validation. + MarkingNotFoundError: If markings to remove are not found on + the provided SDO or SRO. + + Returns: + A new version of the given SDO or SRO with specified markings cleared. + + """ + selectors = utils.convert_to_list(selectors) + utils.validate(obj, selectors) + + granular_markings = obj.get("granular_markings") + + if not granular_markings: + return obj + + granular_markings = utils.expand_markings(granular_markings) + + sdo = utils.build_granular_marking( + [{"selectors": selectors, "marking_ref": "N/A"}] + ) + + clear = sdo.get("granular_markings", []) + + if not any(clear_selector in sdo_selectors.get("selectors", []) + for sdo_selectors in granular_markings + for clear_marking in clear + for clear_selector in clear_marking.get("selectors", []) + ): + raise exceptions.MarkingNotFoundError(obj, clear) + + for granular_marking in granular_markings: + for s in selectors: + if s in granular_marking.get("selectors", []): + marking_refs = granular_marking.get("marking_ref") + + if marking_refs: + granular_marking["marking_ref"] = "" + + granular_markings = utils.compress_markings(granular_markings) + + if granular_markings: + return new_version(obj, granular_markings=granular_markings) + else: + return new_version(obj, granular_markings=None) + + +def is_marked(obj, marking=None, selectors=None, inherited=False, descendants=False): + """ + Checks if field is marked by any marking or by specific marking(s). + + Args: + obj: An SDO or SRO object. + selectors: string or list of selectors strings relative to the SDO or + SRO in which the properties appear. + marking: identifier or list of marking identifiers that apply to the + properties selected by `selectors`. + inherited: If True, return markings inherited from the given selector. + descendants: If True, return granular markings applied to any children + of the given selector. + + Raises: + InvalidSelectorError: If `selectors` fail validation. + + Returns: + bool: True if ``selectors`` is found on internal SDO or SRO collection. + False otherwise. + + Note: + When a list of marking identifiers is provided, if ANY of the provided + marking identifiers match, True is returned. + + """ + if selectors is None: + raise TypeError("Required argument 'selectors' must be provided") + + selectors = utils.convert_to_list(selectors) + marking = utils.convert_to_list(marking) + utils.validate(obj, selectors) + + granular_markings = obj.get("granular_markings", []) + + marked = False + markings = set() + + for granular_marking in granular_markings: + for user_selector in selectors: + for marking_selector in granular_marking.get("selectors", []): + + if any([(user_selector == marking_selector), # Catch explicit selectors. + (user_selector.startswith(marking_selector) and inherited), # Catch inherited selectors. + (marking_selector.startswith(user_selector) and descendants)]): # Catch descendants selectors + marking_ref = granular_marking.get("marking_ref", "") + + if marking and any(x == marking_ref for x in marking): + markings.update([marking_ref]) + + marked = True + + if marking: + # All user-provided markings must be found. + return markings.issuperset(set(marking)) + + return marked diff --git a/stix2/markings/object_markings.py b/stix2/markings/object_markings.py new file mode 100644 index 0000000..c39c036 --- /dev/null +++ b/stix2/markings/object_markings.py @@ -0,0 +1,130 @@ + +from stix2 import exceptions +from stix2.markings import utils +from stix2.utils import new_version + + +def get_markings(obj): + """ + Get all object level markings from the given SDO or SRO object. + + Args: + obj: A SDO or SRO object. + + Returns: + list: Marking identifiers contained in the SDO or SRO. Empty list if no + markings are present in `object_marking_refs`. + + """ + return obj.get("object_marking_refs", []) + + +def add_markings(obj, marking): + """ + Appends an object level marking to the object_marking_refs collection. + + Args: + obj: A SDO or SRO object. + marking: identifier or list of identifiers to apply SDO or SRO object. + + Returns: + A new version of the given SDO or SRO with specified markings added. + + """ + marking = utils.convert_to_list(marking) + + object_markings = set(obj.get("object_marking_refs", []) + marking) + + return new_version(obj, object_marking_refs=list(object_markings)) + + +def remove_markings(obj, marking): + """ + Removes object level marking from the object_marking_refs collection. + + Args: + obj: A SDO or SRO object. + marking: identifier or list of identifiers that apply to the + SDO or SRO object. + + Raises: + MarkingNotFoundError: If markings to remove are not found on + the provided SDO or SRO. + + Returns: + A new version of the given SDO or SRO with specified markings removed. + + """ + marking = utils.convert_to_list(marking) + + object_markings = obj.get("object_marking_refs", []) + + if not object_markings: + return obj + + if any(x not in obj["object_marking_refs"] for x in marking): + raise exceptions.MarkingNotFoundError(obj, marking) + + new_markings = [x for x in object_markings if x not in marking] + if new_markings: + return new_version(obj, object_marking_refs=new_markings) + else: + return new_version(obj, object_marking_refs=None) + + +def set_markings(obj, marking): + """ + Removes all object level markings and appends new object level markings to + the collection. Refer to `clear_markings` and `add_markings` for details. + + Args: + obj: A SDO or SRO object. + marking: identifier or list of identifiers to apply in the + SDO or SRO object. + + Returns: + A new version of the given SDO or SRO with specified markings removed + and new ones added. + + """ + return add_markings(clear_markings(obj), marking) + + +def clear_markings(obj): + """ + Removes all object level markings from the object_marking_refs collection. + + Args: + obj: A SDO or SRO object. + + Returns: + A new version of the given SDO or SRO with object_marking_refs cleared. + + """ + return new_version(obj, object_marking_refs=None) + + +def is_marked(obj, marking=None): + """ + Checks if SDO or SRO is marked by any marking or by specific marking(s). + + Args: + obj: A SDO or SRO object. + marking: identifier or list of marking identifiers that apply to the + SDO or SRO object. + + Returns: + bool: True if SDO or SRO has object level markings. False otherwise. + + Note: + When an identifier or list of identifiers is provided, if ANY of the + provided marking refs match, True is returned. + + """ + marking = utils.convert_to_list(marking) + object_markings = obj.get("object_marking_refs", []) + + if marking: + return any(x in object_markings for x in marking) + else: + return bool(object_markings) diff --git a/stix2/markings/utils.py b/stix2/markings/utils.py new file mode 100644 index 0000000..d0d38bb --- /dev/null +++ b/stix2/markings/utils.py @@ -0,0 +1,229 @@ + +import collections + +import six + +from stix2 import exceptions + + +def _evaluate_expression(obj, selector): + """ + Walks an SDO or SRO generating selectors to match against ``selector``. If + a match is found and the the value of this property is present in the + objects. Matching value of the property will be returned. + + Args: + obj: An SDO or SRO object. + selector: A string following the selector syntax. + + Returns: + list: Values contained in matching property. Otherwise empty list. + + """ + for items, value in iterpath(obj): + path = ".".join(items) + + if path == selector and value: + return [value] + + return [] + + +def _validate_selector(obj, selector): + """Internal method to evaluate each selector.""" + results = list(_evaluate_expression(obj, selector)) + + if len(results) >= 1: + return True + + +def validate(obj, selectors): + """Given an SDO or SRO, check that each selector is valid.""" + if selectors: + for s in selectors: + if not _validate_selector(obj, s): + raise exceptions.InvalidSelectorError(obj, s) + return + + raise exceptions.InvalidSelectorError(obj, selectors) + + +def convert_to_list(data): + """Convert input into a list for further processing.""" + if data is not None: + if isinstance(data, list): + return data + else: + return [data] + + +def compress_markings(granular_markings): + """ + Compress granular markings list. If there is more than one marking + identifier matches. It will collapse into a single granular marking. + + Examples: + Input: + [ + { + "selectors": [ + "description" + ], + "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" + }, + { + "selectors": [ + "name" + ], + "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" + } + ] + + Output: + [ + { + "selectors": [ + "description", + "name" + ], + "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" + } + ] + + Args: + granular_markings: The granular markings list property present in a + SDO or SRO. + + Returns: + list: A list with all markings collapsed. + + """ + if not granular_markings: + return + + map_ = collections.defaultdict(set) + + for granular_marking in granular_markings: + if granular_marking.get("marking_ref"): + map_[granular_marking.get("marking_ref")].update(granular_marking.get("selectors")) + + compressed = \ + [ + {"marking_ref": marking_ref, "selectors": sorted(selectors)} + for marking_ref, selectors in six.iteritems(map_) + ] + + return compressed + + +def expand_markings(granular_markings): + """ + Expands granular markings list. If there is more than one selector per + granular marking. It will be expanded using the same marking_ref. + + Examples: + Input: + [ + { + "selectors": [ + "description", + "name" + ], + "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" + } + ] + + Output: + [ + { + "selectors": [ + "description" + ], + "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" + }, + { + "selectors": [ + "name" + ], + "marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9" + } + ] + + Args: + granular_markings: The granular markings list property present in a + SDO or SRO. + + Returns: + list: A list with all markings expanded. + + """ + expanded = [] + + for marking in granular_markings: + selectors = marking.get("selectors") + marking_ref = marking.get("marking_ref") + + expanded.extend( + [ + {"marking_ref": marking_ref, "selectors": [selector]} + for selector in selectors + ] + ) + + return expanded + + +def build_granular_marking(granular_marking): + """Returns a dictionary with the required structure for a granular + marking""" + return {"granular_markings": expand_markings(granular_marking)} + + +def iterpath(obj, path=None): + """ + Generator which walks the input ``obj`` model. Each iteration yields a + tuple containing a list of ancestors and the property value. + + Args: + obj: An SDO or SRO object. + path: None, used recursively to store ancestors. + + Example: + >>> for item in iterpath(obj): + >>> print(item) + (['type'], 'campaign') + ... + (['cybox', 'objects', '[0]', 'hashes', 'sha1'], 'cac35ec206d868b7d7cb0b55f31d9425b075082b') + + Returns: + tuple: Containing two items: a list of ancestors and the + property value. + + """ + if path is None: + path = [] + + for varname, varobj in iter(sorted(six.iteritems(obj))): + path.append(varname) + yield (path, varobj) + + if isinstance(varobj, dict): + + for item in iterpath(varobj, path): + yield item + + elif isinstance(varobj, list): + + for item in varobj: + index = "[{0}]".format(varobj.index(item)) + path.append(index) + + yield (path, item) + + if isinstance(item, dict): + for descendant in iterpath(item, path): + yield descendant + + path.pop() + + path.pop() diff --git a/stix2/observables.py b/stix2/observables.py index b9dcf7f..4caaaa5 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -5,10 +5,7 @@ embedded in Email Message objects, inherit from _STIXBase instead of Observable and do not have a '_type' attribute. """ -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict +from collections import OrderedDict from .base import _Extension, _Observable, _STIXBase from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError, diff --git a/stix2/properties.py b/stix2/properties.py index 6406ad4..4889a45 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -312,6 +312,7 @@ class ReferenceProperty(Property): def clean(self, value): if isinstance(value, _STIXBase): value = value.id + value = str(value) if self.type: if not value.startswith(self.type): raise ValueError("must start with '{0}'.".format(self.type)) diff --git a/stix2/sdo.py b/stix2/sdo.py index 8c27d11..77c781a 100644 --- a/stix2/sdo.py +++ b/stix2/sdo.py @@ -1,9 +1,6 @@ """STIX 2.0 Domain Objects""" -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict +from collections import OrderedDict import stix2 diff --git a/stix2/sro.py b/stix2/sro.py index 05a29ed..af483bc 100644 --- a/stix2/sro.py +++ b/stix2/sro.py @@ -1,9 +1,6 @@ """STIX 2.0 Relationship Objects.""" -try: - from collections import OrderedDict -except ImportError: - from ordereddict import OrderedDict +from collections import OrderedDict from .base import _STIXBase from .common import ExternalReference, GranularMarking diff --git a/stix2/test/constants.py b/stix2/test/constants.py index 958120b..839b547 100644 --- a/stix2/test/constants.py +++ b/stix2/test/constants.py @@ -20,6 +20,26 @@ TOOL_ID = "tool--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f" SIGHTING_ID = "sighting--bfbc19db-ec35-4e45-beed-f8bde2a772fb" VULNERABILITY_ID = "vulnerability--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061" +MARKING_IDS = [ + "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", + "marking-definition--443eb5c3-a76c-4a0a-8caa-e93998e7bc09", + "marking-definition--57fcd772-9c1d-41b0-8d1f-3d47713415d9", + "marking-definition--462bf1a6-03d2-419c-b74e-eee2238b2de4", + "marking-definition--68520ae2-fefe-43a9-84ee-2c2a934d2c7d", + "marking-definition--2802dfb1-1019-40a8-8848-68d0ec0e417f", +] + +# All required args for a Campaign instance, plus some optional args +CAMPAIGN_MORE_KWARGS = dict( + type='campaign', + id=CAMPAIGN_ID, + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector.", +) + # Minimum required args for an Identity instance IDENTITY_KWARGS = dict( name="John Smith", @@ -38,6 +58,17 @@ MALWARE_KWARGS = dict( name="Cryptolocker", ) +# All required args for a Malware instance, plus some optional args +MALWARE_MORE_KWARGS = dict( + type='malware', + id=MALWARE_ID, + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + labels=['ransomware'], + name="Cryptolocker", + description="A ransomware related to ..." +) + # Minimum required args for a Relationship instance RELATIONSHIP_KWARGS = dict( relationship_type="indicates", diff --git a/stix2/test/test_granular_markings.py b/stix2/test/test_granular_markings.py new file mode 100644 index 0000000..e910ad3 --- /dev/null +++ b/stix2/test/test_granular_markings.py @@ -0,0 +1,1026 @@ + +import pytest + +from stix2 import Malware, markings + +from .constants import MALWARE_MORE_KWARGS as MALWARE_KWARGS_CONST +from .constants import MARKING_IDS + +"""Tests for the Data Markings API.""" + +MALWARE_KWARGS = MALWARE_KWARGS_CONST.copy() + + +def test_add_marking_mark_one_selector_multiple_refs(): + before = Malware( + **MALWARE_KWARGS + ) + after = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[1] + }, + ], + **MALWARE_KWARGS + ) + before = markings.add_markings(before, [MARKING_IDS[0], MARKING_IDS[1]], ["description"]) + + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +@pytest.mark.parametrize("data", [ + ( + Malware(**MALWARE_KWARGS), + Malware( + granular_markings=[ + { + "selectors": ["description", "name"], + "marking_ref": MARKING_IDS[0] + }, + ], + **MALWARE_KWARGS), + ), + ( + MALWARE_KWARGS, + dict( + granular_markings=[ + { + "selectors": ["description", "name"], + "marking_ref": MARKING_IDS[0] + }, + ], + **MALWARE_KWARGS), + ), +]) +def test_add_marking_mark_multiple_selector_one_refs(data): + before = data[0] + after = data[1] + + before = markings.add_markings(before, [MARKING_IDS[0]], ["description", "name"]) + + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +def test_add_marking_mark_multiple_selector_multiple_refs(): + before = Malware( + **MALWARE_KWARGS + ) + after = Malware( + granular_markings=[ + { + "selectors": ["description", "name"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description", "name"], + "marking_ref": MARKING_IDS[1] + } + ], + **MALWARE_KWARGS + ) + before = markings.add_markings(before, [MARKING_IDS[0], MARKING_IDS[1]], ["description", "name"]) + + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +def test_add_marking_mark_another_property_same_marking(): + before = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + ], + **MALWARE_KWARGS + ) + after = Malware( + granular_markings=[ + { + "selectors": ["description", "name"], + "marking_ref": MARKING_IDS[0] + }, + ], + **MALWARE_KWARGS + ) + before = markings.add_markings(before, [MARKING_IDS[0]], ["name"]) + + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +def test_add_marking_mark_same_property_same_marking(): + before = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + ], + **MALWARE_KWARGS + ) + after = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + ], + **MALWARE_KWARGS + ) + before = markings.add_markings(before, [MARKING_IDS[0]], ["description"]) + + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +@pytest.mark.parametrize("data,marking", [ + ({"description": "test description"}, + [["title"], ["marking-definition--1", "marking-definition--2"], + "", ["marking-definition--1", "marking-definition--2"], + [], ["marking-definition--1", "marking-definition--2"], + [""], ["marking-definition--1", "marking-definition--2"], + ["description"], [""], + ["description"], [], + ["description"], ["marking-definition--1", 456] + ]) +]) +def test_add_marking_bad_selector(data, marking): + with pytest.raises(AssertionError): + markings.add_markings(data, marking[0], marking[1]) + + +GET_MARKINGS_TEST_DATA = { + "a": 333, + "b": "value", + "c": [ + 17, + "list value", + { + "g": "nested", + "h": 45 + } + ], + "x": { + "y": [ + "hello", + 88 + ], + "z": { + "foo1": "bar", + "foo2": 65 + } + }, + "granular_markings": [ + { + "marking_ref": "1", + "selectors": ["a"] + }, + { + "marking_ref": "2", + "selectors": ["c"] + }, + { + "marking_ref": "3", + "selectors": ["c.[1]"] + }, + { + "marking_ref": "4", + "selectors": ["c.[2]"] + }, + { + "marking_ref": "5", + "selectors": ["c.[2].g"] + }, + { + "marking_ref": "6", + "selectors": ["x"] + }, + { + "marking_ref": "7", + "selectors": ["x.y"] + }, + { + "marking_ref": "8", + "selectors": ["x.y.[1]"] + }, + { + "marking_ref": "9", + "selectors": ["x.z"] + }, + { + "marking_ref": "10", + "selectors": ["x.z.foo2"] + }, + ] +} + + +@pytest.mark.parametrize("data", [GET_MARKINGS_TEST_DATA]) +def test_get_markings_smoke(data): + """Test get_markings does not fail.""" + assert len(markings.get_markings(data, "a")) >= 1 + assert markings.get_markings(data, "a") == ["1"] + + +@pytest.mark.parametrize("data", [ + GET_MARKINGS_TEST_DATA, + {"b": 1234}, +]) +def test_get_markings_not_marked(data): + """Test selector that is not marked returns empty list.""" + results = markings.get_markings(data, "b") + assert len(results) == 0 + + +@pytest.mark.parametrize("data", [GET_MARKINGS_TEST_DATA]) +def test_get_markings_multiple_selectors(data): + """Test multiple selectors return combination of markings.""" + total = markings.get_markings(data, ["x.y", "x.z"]) + xy_markings = markings.get_markings(data, ["x.y"]) + xz_markings = markings.get_markings(data, ["x.z"]) + + assert set(xy_markings).issubset(total) + assert set(xz_markings).issubset(total) + assert set(xy_markings).union(xz_markings).issuperset(total) + + +@pytest.mark.parametrize("data,selector", [ + (GET_MARKINGS_TEST_DATA, "foo"), + (GET_MARKINGS_TEST_DATA, ""), + (GET_MARKINGS_TEST_DATA, []), + (GET_MARKINGS_TEST_DATA, [""]), + (GET_MARKINGS_TEST_DATA, "x.z.[-2]"), + (GET_MARKINGS_TEST_DATA, "c.f"), + (GET_MARKINGS_TEST_DATA, "c.[2].i"), + (GET_MARKINGS_TEST_DATA, "c.[3]"), + (GET_MARKINGS_TEST_DATA, "d"), + (GET_MARKINGS_TEST_DATA, "x.[0]"), + (GET_MARKINGS_TEST_DATA, "z.y.w"), + (GET_MARKINGS_TEST_DATA, "x.z.[1]"), + (GET_MARKINGS_TEST_DATA, "x.z.foo3") +]) +def test_get_markings_bad_selector(data, selector): + """Test bad selectors raise exception""" + with pytest.raises(AssertionError): + markings.get_markings(data, selector) + + +@pytest.mark.parametrize("data", [GET_MARKINGS_TEST_DATA]) +def test_get_markings_positional_arguments_combinations(data): + """Test multiple combinations for inherited and descendant markings.""" + assert set(markings.get_markings(data, "a", False, False)) == set(["1"]) + assert set(markings.get_markings(data, "a", True, False)) == set(["1"]) + assert set(markings.get_markings(data, "a", True, True)) == set(["1"]) + assert set(markings.get_markings(data, "a", False, True)) == set(["1"]) + + assert set(markings.get_markings(data, "b", False, False)) == set([]) + assert set(markings.get_markings(data, "b", True, False)) == set([]) + assert set(markings.get_markings(data, "b", True, True)) == set([]) + assert set(markings.get_markings(data, "b", False, True)) == set([]) + + assert set(markings.get_markings(data, "c", False, False)) == set(["2"]) + assert set(markings.get_markings(data, "c", True, False)) == set(["2"]) + assert set(markings.get_markings(data, "c", True, True)) == set(["2", "3", "4", "5"]) + assert set(markings.get_markings(data, "c", False, True)) == set(["2", "3", "4", "5"]) + + assert set(markings.get_markings(data, "c.[0]", False, False)) == set([]) + assert set(markings.get_markings(data, "c.[0]", True, False)) == set(["2"]) + assert set(markings.get_markings(data, "c.[0]", True, True)) == set(["2"]) + assert set(markings.get_markings(data, "c.[0]", False, True)) == set([]) + + assert set(markings.get_markings(data, "c.[1]", False, False)) == set(["3"]) + assert set(markings.get_markings(data, "c.[1]", True, False)) == set(["2", "3"]) + assert set(markings.get_markings(data, "c.[1]", True, True)) == set(["2", "3"]) + assert set(markings.get_markings(data, "c.[1]", False, True)) == set(["3"]) + + assert set(markings.get_markings(data, "c.[2]", False, False)) == set(["4"]) + assert set(markings.get_markings(data, "c.[2]", True, False)) == set(["2", "4"]) + assert set(markings.get_markings(data, "c.[2]", True, True)) == set(["2", "4", "5"]) + assert set(markings.get_markings(data, "c.[2]", False, True)) == set(["4", "5"]) + + assert set(markings.get_markings(data, "c.[2].g", False, False)) == set(["5"]) + assert set(markings.get_markings(data, "c.[2].g", True, False)) == set(["2", "4", "5"]) + assert set(markings.get_markings(data, "c.[2].g", True, True)) == set(["2", "4", "5"]) + assert set(markings.get_markings(data, "c.[2].g", False, True)) == set(["5"]) + + assert set(markings.get_markings(data, "x", False, False)) == set(["6"]) + assert set(markings.get_markings(data, "x", True, False)) == set(["6"]) + assert set(markings.get_markings(data, "x", True, True)) == set(["6", "7", "8", "9", "10"]) + assert set(markings.get_markings(data, "x", False, True)) == set(["6", "7", "8", "9", "10"]) + + assert set(markings.get_markings(data, "x.y", False, False)) == set(["7"]) + assert set(markings.get_markings(data, "x.y", True, False)) == set(["6", "7"]) + assert set(markings.get_markings(data, "x.y", True, True)) == set(["6", "7", "8"]) + assert set(markings.get_markings(data, "x.y", False, True)) == set(["7", "8"]) + + assert set(markings.get_markings(data, "x.y.[0]", False, False)) == set([]) + assert set(markings.get_markings(data, "x.y.[0]", True, False)) == set(["6", "7"]) + assert set(markings.get_markings(data, "x.y.[0]", True, True)) == set(["6", "7"]) + assert set(markings.get_markings(data, "x.y.[0]", False, True)) == set([]) + + assert set(markings.get_markings(data, "x.y.[1]", False, False)) == set(["8"]) + assert set(markings.get_markings(data, "x.y.[1]", True, False)) == set(["6", "7", "8"]) + assert set(markings.get_markings(data, "x.y.[1]", True, True)) == set(["6", "7", "8"]) + assert set(markings.get_markings(data, "x.y.[1]", False, True)) == set(["8"]) + + assert set(markings.get_markings(data, "x.z", False, False)) == set(["9"]) + assert set(markings.get_markings(data, "x.z", True, False)) == set(["6", "9"]) + assert set(markings.get_markings(data, "x.z", True, True)) == set(["6", "9", "10"]) + assert set(markings.get_markings(data, "x.z", False, True)) == set(["9", "10"]) + + assert set(markings.get_markings(data, "x.z.foo1", False, False)) == set([]) + assert set(markings.get_markings(data, "x.z.foo1", True, False)) == set(["6", "9"]) + assert set(markings.get_markings(data, "x.z.foo1", True, True)) == set(["6", "9"]) + assert set(markings.get_markings(data, "x.z.foo1", False, True)) == set([]) + + assert set(markings.get_markings(data, "x.z.foo2", False, False)) == set(["10"]) + assert set(markings.get_markings(data, "x.z.foo2", True, False)) == set(["6", "9", "10"]) + assert set(markings.get_markings(data, "x.z.foo2", True, True)) == set(["6", "9", "10"]) + assert set(markings.get_markings(data, "x.z.foo2", False, True)) == set(["10"]) + + +@pytest.mark.parametrize("before", [ + Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[1] + }, + ], + **MALWARE_KWARGS + ), + dict( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[1] + }, + ], + **MALWARE_KWARGS + ), +]) +def test_remove_marking_remove_one_selector_with_multiple_refs(before): + before = markings.remove_markings(before, [MARKING_IDS[0], MARKING_IDS[1]], ["description"]) + assert "granular_markings" not in before + + +def test_remove_marking_remove_multiple_selector_one_ref(): + before = Malware( + granular_markings=[ + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + before = markings.remove_markings(before, MARKING_IDS[0], ["description", "modified"]) + assert "granular_markings" not in before + + +def test_remove_marking_mark_one_selector_from_multiple_ones(): + after = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + before = Malware( + granular_markings=[ + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + before = markings.remove_markings(before, [MARKING_IDS[0]], ["modified"]) + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +def test_remove_marking_mark_one_selector_markings_from_multiple_ones(): + after = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[1] + } + ], + **MALWARE_KWARGS + ) + before = Malware( + granular_markings=[ + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[1] + } + ], + **MALWARE_KWARGS + ) + before = markings.remove_markings(before, [MARKING_IDS[0]], ["modified"]) + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +def test_remove_marking_mark_mutilple_selector_multiple_refs(): + before = Malware( + granular_markings=[ + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[1] + } + ], + **MALWARE_KWARGS + ) + before = markings.remove_markings(before, [MARKING_IDS[0], MARKING_IDS[1]], ["description", "modified"]) + assert "granular_markings" not in before + + +def test_remove_marking_mark_another_property_same_marking(): + after = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + before = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["modified"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + before = markings.remove_markings(before, [MARKING_IDS[0]], ["modified"]) + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +def test_remove_marking_mark_same_property_same_marking(): + before = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + before = markings.remove_markings(before, [MARKING_IDS[0]], ["description"]) + assert "granular_markings" not in before + + +def test_remove_no_markings(): + before = { + "description": "test description", + } + after = markings.remove_markings(before, ["marking-definition--1"], ["description"]) + assert before == after + + +def test_remove_marking_bad_selector(): + before = { + "description": "test description", + } + with pytest.raises(AssertionError): + markings.remove_markings(before, ["marking-definition--1", "marking-definition--2"], ["title"]) + + +IS_MARKED_TEST_DATA = [ + Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[1] + }, + { + "selectors": ["labels", "description"], + "marking_ref": MARKING_IDS[2] + }, + { + "selectors": ["labels", "description"], + "marking_ref": MARKING_IDS[3] + }, + ], + **MALWARE_KWARGS + ), + dict( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[1] + }, + { + "selectors": ["labels", "description"], + "marking_ref": MARKING_IDS[2] + }, + { + "selectors": ["labels", "description"], + "marking_ref": MARKING_IDS[3] + }, + ], + **MALWARE_KWARGS + ), +] + + +@pytest.mark.parametrize("data", IS_MARKED_TEST_DATA) +def test_is_marked_smoke(data): + """Smoke test is_marked call does not fail.""" + assert markings.is_marked(data, selectors=["description"]) + assert markings.is_marked(data, selectors=["modified"]) is False + + +@pytest.mark.parametrize("data,selector", [ + (IS_MARKED_TEST_DATA[0], "foo"), + (IS_MARKED_TEST_DATA[0], ""), + (IS_MARKED_TEST_DATA[0], []), + (IS_MARKED_TEST_DATA[0], [""]), + (IS_MARKED_TEST_DATA[0], "x.z.[-2]"), + (IS_MARKED_TEST_DATA[0], "c.f"), + (IS_MARKED_TEST_DATA[0], "c.[2].i"), + (IS_MARKED_TEST_DATA[1], "c.[3]"), + (IS_MARKED_TEST_DATA[1], "d"), + (IS_MARKED_TEST_DATA[1], "x.[0]"), + (IS_MARKED_TEST_DATA[1], "z.y.w"), + (IS_MARKED_TEST_DATA[1], "x.z.[1]"), + (IS_MARKED_TEST_DATA[1], "x.z.foo3") +]) +def test_is_marked_invalid_selector(data, selector): + """Test invalid selector raises an error.""" + with pytest.raises(AssertionError): + markings.is_marked(data, selectors=selector) + + +@pytest.mark.parametrize("data", IS_MARKED_TEST_DATA) +def test_is_marked_mix_selector(data): + """Test valid selector, one marked and one not marked returns True.""" + assert markings.is_marked(data, selectors=["description", "labels"]) + assert markings.is_marked(data, selectors=["description"]) + + +@pytest.mark.parametrize("data", IS_MARKED_TEST_DATA) +def test_is_marked_valid_selector_no_refs(data): + """Test that a valid selector return True when it has marking refs and False when not.""" + assert markings.is_marked(data, selectors=["description"]) + assert markings.is_marked(data, [MARKING_IDS[2], MARKING_IDS[3]], ["description"]) + assert markings.is_marked(data, [MARKING_IDS[2]], ["description"]) + assert markings.is_marked(data, [MARKING_IDS[2], MARKING_IDS[5]], ["description"]) is False + + +@pytest.mark.parametrize("data", IS_MARKED_TEST_DATA) +def test_is_marked_valid_selector_and_refs(data): + """Test that a valid selector returns True when marking_refs match.""" + assert markings.is_marked(data, [MARKING_IDS[1]], ["description"]) + assert markings.is_marked(data, [MARKING_IDS[1]], ["modified"]) is False + + +@pytest.mark.parametrize("data", IS_MARKED_TEST_DATA) +def test_is_marked_valid_selector_multiple_refs(data): + """Test that a valid selector returns True if aall marking_refs match. + Otherwise False.""" + assert markings.is_marked(data, [MARKING_IDS[2], MARKING_IDS[3]], ["labels"]) + assert markings.is_marked(data, [MARKING_IDS[2], MARKING_IDS[1]], ["labels"]) is False + assert markings.is_marked(data, MARKING_IDS[2], ["labels"]) + assert markings.is_marked(data, ["marking-definition--1234"], ["labels"]) is False + + +@pytest.mark.parametrize("data", IS_MARKED_TEST_DATA) +def test_is_marked_no_marking_refs(data): + """Test that a valid content selector with no marking_refs returns True + if there is a granular_marking that asserts that field, False + otherwise.""" + assert markings.is_marked(data, selectors=["type"]) is False + assert markings.is_marked(data, selectors=["labels"]) + + +@pytest.mark.parametrize("data", IS_MARKED_TEST_DATA) +def test_is_marked_no_selectors(data): + """Test that we're ensuring 'selectors' is provided.""" + with pytest.raises(TypeError) as excinfo: + markings.granular_markings.is_marked(data) + assert "'selectors' must be provided" in str(excinfo.value) + + +def test_is_marked_positional_arguments_combinations(): + """Test multiple combinations for inherited and descendant markings.""" + test_sdo = \ + { + "a": 333, + "b": "value", + "c": [ + 17, + "list value", + { + "g": "nested", + "h": 45 + } + ], + "x": { + "y": [ + "hello", + 88 + ], + "z": { + "foo1": "bar", + "foo2": 65 + } + }, + "granular_markings": [ + { + "marking_ref": "1", + "selectors": ["a"] + }, + { + "marking_ref": "2", + "selectors": ["c"] + }, + { + "marking_ref": "3", + "selectors": ["c.[1]"] + }, + { + "marking_ref": "4", + "selectors": ["c.[2]"] + }, + { + "marking_ref": "5", + "selectors": ["c.[2].g"] + }, + { + "marking_ref": "6", + "selectors": ["x"] + }, + { + "marking_ref": "7", + "selectors": ["x.y"] + }, + { + "marking_ref": "8", + "selectors": ["x.y.[1]"] + }, + { + "marking_ref": "9", + "selectors": ["x.z"] + }, + { + "marking_ref": "10", + "selectors": ["x.z.foo2"] + }, + ] + } + + assert markings.is_marked(test_sdo, ["1"], "a", False, False) + assert markings.is_marked(test_sdo, ["1"], "a", True, False) + assert markings.is_marked(test_sdo, ["1"], "a", True, True) + assert markings.is_marked(test_sdo, ["1"], "a", False, True) + + assert markings.is_marked(test_sdo, "b", inherited=False, descendants=False) is False + assert markings.is_marked(test_sdo, "b", inherited=True, descendants=False) is False + assert markings.is_marked(test_sdo, "b", inherited=True, descendants=True) is False + assert markings.is_marked(test_sdo, "b", inherited=False, descendants=True) is False + + assert markings.is_marked(test_sdo, ["2"], "c", False, False) + assert markings.is_marked(test_sdo, ["2"], "c", True, False) + assert markings.is_marked(test_sdo, ["2", "3", "4", "5"], "c", True, True) + assert markings.is_marked(test_sdo, ["2", "3", "4", "5"], "c", False, True) + + assert markings.is_marked(test_sdo, "c.[0]", inherited=False, descendants=False) is False + assert markings.is_marked(test_sdo, ["2"], "c.[0]", True, False) + assert markings.is_marked(test_sdo, ["2"], "c.[0]", True, True) + assert markings.is_marked(test_sdo, "c.[0]", inherited=False, descendants=True) is False + + assert markings.is_marked(test_sdo, ["3"], "c.[1]", False, False) + assert markings.is_marked(test_sdo, ["2", "3"], "c.[1]", True, False) + assert markings.is_marked(test_sdo, ["2", "3"], "c.[1]", True, True) + assert markings.is_marked(test_sdo, ["3"], "c.[1]", False, True) + + assert markings.is_marked(test_sdo, ["4"], "c.[2]", False, False) + assert markings.is_marked(test_sdo, ["2", "4"], "c.[2]", True, False) + assert markings.is_marked(test_sdo, ["2", "4", "5"], "c.[2]", True, True) + assert markings.is_marked(test_sdo, ["4", "5"], "c.[2]", False, True) + + assert markings.is_marked(test_sdo, ["5"], "c.[2].g", False, False) + assert markings.is_marked(test_sdo, ["2", "4", "5"], "c.[2].g", True, False) + assert markings.is_marked(test_sdo, ["2", "4", "5"], "c.[2].g", True, True) + assert markings.is_marked(test_sdo, ["5"], "c.[2].g", False, True) + + assert markings.is_marked(test_sdo, ["6"], "x", False, False) + assert markings.is_marked(test_sdo, ["6"], "x", True, False) + assert markings.is_marked(test_sdo, ["6", "7", "8", "9", "10"], "x", True, True) + assert markings.is_marked(test_sdo, ["6", "7", "8", "9", "10"], "x", False, True) + + assert markings.is_marked(test_sdo, ["7"], "x.y", False, False) + assert markings.is_marked(test_sdo, ["6", "7"], "x.y", True, False) + assert markings.is_marked(test_sdo, ["6", "7", "8"], "x.y", True, True) + assert markings.is_marked(test_sdo, ["7", "8"], "x.y", False, True) + + assert markings.is_marked(test_sdo, "x.y.[0]", inherited=False, descendants=False) is False + assert markings.is_marked(test_sdo, ["6", "7"], "x.y.[0]", True, False) + assert markings.is_marked(test_sdo, ["6", "7"], "x.y.[0]", True, True) + assert markings.is_marked(test_sdo, "x.y.[0]", inherited=False, descendants=True) is False + + assert markings.is_marked(test_sdo, ["8"], "x.y.[1]", False, False) + assert markings.is_marked(test_sdo, ["6", "7", "8"], "x.y.[1]", True, False) + assert markings.is_marked(test_sdo, ["6", "7", "8"], "x.y.[1]", True, True) + assert markings.is_marked(test_sdo, ["8"], "x.y.[1]", False, True) + + assert markings.is_marked(test_sdo, ["9"], "x.z", False, False) + assert markings.is_marked(test_sdo, ["6", "9"], "x.z", True, False) + assert markings.is_marked(test_sdo, ["6", "9", "10"], "x.z", True, True) + assert markings.is_marked(test_sdo, ["9", "10"], "x.z", False, True) + + assert markings.is_marked(test_sdo, "x.z.foo1", inherited=False, descendants=False) is False + assert markings.is_marked(test_sdo, ["6", "9"], "x.z.foo1", True, False) + assert markings.is_marked(test_sdo, ["6", "9"], "x.z.foo1", True, True) + assert markings.is_marked(test_sdo, "x.z.foo1", inherited=False, descendants=True) is False + + assert markings.is_marked(test_sdo, ["10"], "x.z.foo2", False, False) + assert markings.is_marked(test_sdo, ["6", "9", "10"], "x.z.foo2", True, False) + assert markings.is_marked(test_sdo, ["6", "9", "10"], "x.z.foo2", True, True) + assert markings.is_marked(test_sdo, ["10"], "x.z.foo2", False, True) + + +def test_create_sdo_with_invalid_marking(): + with pytest.raises(AssertionError) as excinfo: + Malware( + granular_markings=[ + { + "selectors": ["foo"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + assert str(excinfo.value) == "Selector foo in Malware is not valid!" + + +def test_set_marking_mark_one_selector_multiple_refs(): + before = Malware( + **MALWARE_KWARGS + ) + after = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[1] + } + ], + **MALWARE_KWARGS + ) + before = markings.set_markings(before, [MARKING_IDS[0], MARKING_IDS[1]], ["description"]) + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +def test_set_marking_mark_multiple_selector_one_refs(): + before = Malware( + granular_markings=[ + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[1] + } + ], + **MALWARE_KWARGS + ) + after = Malware( + granular_markings=[ + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + before = markings.set_markings(before, [MARKING_IDS[0]], ["description", "modified"]) + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +def test_set_marking_mark_multiple_selector_multiple_refs_from_none(): + before = Malware( + **MALWARE_KWARGS + ) + after = Malware( + granular_markings=[ + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["description", "modified"], + "marking_ref": MARKING_IDS[1] + } + ], + **MALWARE_KWARGS + ) + before = markings.set_markings(before, [MARKING_IDS[0], MARKING_IDS[1]], ["description", "modified"]) + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +def test_set_marking_mark_another_property_same_marking(): + before = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + after = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[1] + }, + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[2] + } + ], + **MALWARE_KWARGS + ) + before = markings.set_markings(before, [MARKING_IDS[1], MARKING_IDS[2]], ["description"]) + + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +@pytest.mark.parametrize("marking", [ + ([MARKING_IDS[4], MARKING_IDS[5]], ["foo"]), + ([MARKING_IDS[4], MARKING_IDS[5]], ""), + ([MARKING_IDS[4], MARKING_IDS[5]], []), + ([MARKING_IDS[4], MARKING_IDS[5]], [""]), +]) +def test_set_marking_bad_selector(marking): + before = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + after = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + + with pytest.raises(AssertionError): + before = markings.set_markings(before, marking[0], marking[1]) + + assert before == after + + +def test_set_marking_mark_same_property_same_marking(): + before = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + after = Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + } + ], + **MALWARE_KWARGS + ) + before = markings.set_markings(before, [MARKING_IDS[0]], ["description"]) + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + +CLEAR_MARKINGS_TEST_DATA = [ + Malware( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["modified", "description"], + "marking_ref": MARKING_IDS[1] + }, + { + "selectors": ["modified", "description", "type"], + "marking_ref": MARKING_IDS[2] + }, + ], + **MALWARE_KWARGS + ), + dict( + granular_markings=[ + { + "selectors": ["description"], + "marking_ref": MARKING_IDS[0] + }, + { + "selectors": ["modified", "description"], + "marking_ref": MARKING_IDS[1] + }, + { + "selectors": ["modified", "description", "type"], + "marking_ref": MARKING_IDS[2] + }, + ], + **MALWARE_KWARGS + ) +] + + +@pytest.mark.parametrize("data", CLEAR_MARKINGS_TEST_DATA) +def test_clear_marking_smoke(data): + """Test clear_marking call does not fail.""" + data = markings.clear_markings(data, "modified") + assert markings.is_marked(data, "modified") is False + + +@pytest.mark.parametrize("data", CLEAR_MARKINGS_TEST_DATA) +def test_clear_marking_multiple_selectors(data): + """Test clearing markings for multiple selectors effectively removes associated markings.""" + data = markings.clear_markings(data, ["type", "description"]) + assert markings.is_marked(data, ["type", "description"]) is False + + +@pytest.mark.parametrize("data", CLEAR_MARKINGS_TEST_DATA) +def test_clear_marking_one_selector(data): + """Test markings associated with one selector were removed.""" + data = markings.clear_markings(data, "description") + assert markings.is_marked(data, "description") is False + + +@pytest.mark.parametrize("data", CLEAR_MARKINGS_TEST_DATA) +def test_clear_marking_all_selectors(data): + data = markings.clear_markings(data, ["description", "type", "modified"]) + assert markings.is_marked(data, "description") is False + assert "granular_markings" not in data + + +@pytest.mark.parametrize("data,selector", [ + (CLEAR_MARKINGS_TEST_DATA[0], "foo"), + (CLEAR_MARKINGS_TEST_DATA[0], ""), + (CLEAR_MARKINGS_TEST_DATA[1], []), + (CLEAR_MARKINGS_TEST_DATA[1], [""]), +]) +def test_clear_marking_bad_selector(data, selector): + """Test bad selector raises exception.""" + with pytest.raises(AssertionError): + markings.clear_markings(data, selector) diff --git a/stix2/test/test_markings.py b/stix2/test/test_markings.py index 9fe51fb..0c6069a 100644 --- a/stix2/test/test_markings.py +++ b/stix2/test/test_markings.py @@ -103,7 +103,7 @@ def test_marking_def_invalid_type(): stix2.MarkingDefinition( id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", created="2017-01-20T00:00:00.000Z", - definition_type="my-definiition-type", + definition_type="my-definition-type", definition=stix2.StatementMarking("Copyright 2016, Example Corp") ) diff --git a/stix2/test/test_object_markings.py b/stix2/test/test_object_markings.py new file mode 100644 index 0000000..36e8e4d --- /dev/null +++ b/stix2/test/test_object_markings.py @@ -0,0 +1,534 @@ + +import pytest + +from stix2 import Malware, exceptions, markings + +from .constants import FAKE_TIME, MALWARE_ID, MARKING_IDS +from .constants import MALWARE_KWARGS as MALWARE_KWARGS_CONST + +"""Tests for the Data Markings API.""" + +MALWARE_KWARGS = MALWARE_KWARGS_CONST.copy() +MALWARE_KWARGS.update({ + 'id': MALWARE_ID, + 'created': FAKE_TIME, + 'modified': FAKE_TIME, +}) + + +@pytest.mark.parametrize("data", [ + ( + Malware(**MALWARE_KWARGS), + Malware(object_marking_refs=[MARKING_IDS[0]], + **MALWARE_KWARGS), + ), + ( + MALWARE_KWARGS, + dict(object_marking_refs=[MARKING_IDS[0]], + **MALWARE_KWARGS), + ), +]) +def test_add_markings_one_marking(data): + before = data[0] + after = data[1] + + before = markings.add_markings(before, MARKING_IDS[0], None) + + for m in before["object_marking_refs"]: + assert m in after["object_marking_refs"] + + +def test_add_markings_multiple_marking(): + before = Malware( + **MALWARE_KWARGS + ) + + after = Malware( + object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1]], + **MALWARE_KWARGS + ) + + before = markings.add_markings(before, [MARKING_IDS[0], MARKING_IDS[1]], None) + + for m in before["object_marking_refs"]: + assert m in after["object_marking_refs"] + + +def test_add_markings_combination(): + before = Malware( + **MALWARE_KWARGS + ) + after = Malware( + object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1]], + granular_markings=[ + { + "selectors": ["labels"], + "marking_ref": MARKING_IDS[2] + }, + { + "selectors": ["name"], + "marking_ref": MARKING_IDS[3] + } + ], + **MALWARE_KWARGS + ) + + before = markings.add_markings(before, MARKING_IDS[0], None) + before = markings.add_markings(before, MARKING_IDS[1], None) + before = markings.add_markings(before, MARKING_IDS[2], "labels") + before = markings.add_markings(before, MARKING_IDS[3], "name") + + for m in before["granular_markings"]: + assert m in after["granular_markings"] + + for m in before["object_marking_refs"]: + assert m in after["object_marking_refs"] + + +@pytest.mark.parametrize("data", [ + ([""]), + (""), + ([]), + ([MARKING_IDS[0], 456]) +]) +def test_add_markings_bad_markings(data): + before = Malware( + **MALWARE_KWARGS + ) + with pytest.raises(exceptions.InvalidValueError): + before = markings.add_markings(before, data, None) + + assert "object_marking_refs" not in before + + +GET_MARKINGS_TEST_DATA = \ + { + "a": 333, + "b": "value", + "c": [ + 17, + "list value", + { + "g": "nested", + "h": 45 + } + ], + "x": { + "y": [ + "hello", + 88 + ], + "z": { + "foo1": "bar", + "foo2": 65 + } + }, + "object_marking_refs": ["11"], + "granular_markings": [ + { + "marking_ref": "1", + "selectors": ["a"] + }, + { + "marking_ref": "2", + "selectors": ["c"] + }, + { + "marking_ref": "3", + "selectors": ["c.[1]"] + }, + { + "marking_ref": "4", + "selectors": ["c.[2]"] + }, + { + "marking_ref": "5", + "selectors": ["c.[2].g"] + }, + { + "marking_ref": "6", + "selectors": ["x"] + }, + { + "marking_ref": "7", + "selectors": ["x.y"] + }, + { + "marking_ref": "8", + "selectors": ["x.y.[1]"] + }, + { + "marking_ref": "9", + "selectors": ["x.z"] + }, + { + "marking_ref": "10", + "selectors": ["x.z.foo2"] + }, + ] + } + + +@pytest.mark.parametrize("data", [GET_MARKINGS_TEST_DATA]) +def test_get_markings_object_marking(data): + assert set(markings.get_markings(data, None)) == set(["11"]) + + +@pytest.mark.parametrize("data", [GET_MARKINGS_TEST_DATA]) +def test_get_markings_object_and_granular_combinations(data): + """Test multiple combinations for inherited and descendant markings.""" + assert set(markings.get_markings(data, "a", False, False)) == set(["1"]) + assert set(markings.get_markings(data, "a", True, False)) == set(["1", "11"]) + assert set(markings.get_markings(data, "a", True, True)) == set(["1", "11"]) + assert set(markings.get_markings(data, "a", False, True)) == set(["1"]) + + assert set(markings.get_markings(data, "b", False, False)) == set([]) + assert set(markings.get_markings(data, "b", True, False)) == set(["11"]) + assert set(markings.get_markings(data, "b", True, True)) == set(["11"]) + assert set(markings.get_markings(data, "b", False, True)) == set([]) + + assert set(markings.get_markings(data, "c", False, False)) == set(["2"]) + assert set(markings.get_markings(data, "c", True, False)) == set(["2", "11"]) + assert set(markings.get_markings(data, "c", True, True)) == set(["2", "3", "4", "5", "11"]) + assert set(markings.get_markings(data, "c", False, True)) == set(["2", "3", "4", "5"]) + + assert set(markings.get_markings(data, "c.[0]", False, False)) == set([]) + assert set(markings.get_markings(data, "c.[0]", True, False)) == set(["2", "11"]) + assert set(markings.get_markings(data, "c.[0]", True, True)) == set(["2", "11"]) + assert set(markings.get_markings(data, "c.[0]", False, True)) == set([]) + + assert set(markings.get_markings(data, "c.[1]", False, False)) == set(["3"]) + assert set(markings.get_markings(data, "c.[1]", True, False)) == set(["2", "3", "11"]) + assert set(markings.get_markings(data, "c.[1]", True, True)) == set(["2", "3", "11"]) + assert set(markings.get_markings(data, "c.[1]", False, True)) == set(["3"]) + + assert set(markings.get_markings(data, "c.[2]", False, False)) == set(["4"]) + assert set(markings.get_markings(data, "c.[2]", True, False)) == set(["2", "4", "11"]) + assert set(markings.get_markings(data, "c.[2]", True, True)) == set(["2", "4", "5", "11"]) + assert set(markings.get_markings(data, "c.[2]", False, True)) == set(["4", "5"]) + + assert set(markings.get_markings(data, "c.[2].g", False, False)) == set(["5"]) + assert set(markings.get_markings(data, "c.[2].g", True, False)) == set(["2", "4", "5", "11"]) + assert set(markings.get_markings(data, "c.[2].g", True, True)) == set(["2", "4", "5", "11"]) + assert set(markings.get_markings(data, "c.[2].g", False, True)) == set(["5"]) + + assert set(markings.get_markings(data, "x", False, False)) == set(["6"]) + assert set(markings.get_markings(data, "x", True, False)) == set(["6", "11"]) + assert set(markings.get_markings(data, "x", True, True)) == set(["6", "7", "8", "9", "10", "11"]) + assert set(markings.get_markings(data, "x", False, True)) == set(["6", "7", "8", "9", "10"]) + + assert set(markings.get_markings(data, "x.y", False, False)) == set(["7"]) + assert set(markings.get_markings(data, "x.y", True, False)) == set(["6", "7", "11"]) + assert set(markings.get_markings(data, "x.y", True, True)) == set(["6", "7", "8", "11"]) + assert set(markings.get_markings(data, "x.y", False, True)) == set(["7", "8"]) + + assert set(markings.get_markings(data, "x.y.[0]", False, False)) == set([]) + assert set(markings.get_markings(data, "x.y.[0]", True, False)) == set(["6", "7", "11"]) + assert set(markings.get_markings(data, "x.y.[0]", True, True)) == set(["6", "7", "11"]) + assert set(markings.get_markings(data, "x.y.[0]", False, True)) == set([]) + + assert set(markings.get_markings(data, "x.y.[1]", False, False)) == set(["8"]) + assert set(markings.get_markings(data, "x.y.[1]", True, False)) == set(["6", "7", "8", "11"]) + assert set(markings.get_markings(data, "x.y.[1]", True, True)) == set(["6", "7", "8", "11"]) + assert set(markings.get_markings(data, "x.y.[1]", False, True)) == set(["8"]) + + assert set(markings.get_markings(data, "x.z", False, False)) == set(["9"]) + assert set(markings.get_markings(data, "x.z", True, False)) == set(["6", "9", "11"]) + assert set(markings.get_markings(data, "x.z", True, True)) == set(["6", "9", "10", "11"]) + assert set(markings.get_markings(data, "x.z", False, True)) == set(["9", "10"]) + + assert set(markings.get_markings(data, "x.z.foo1", False, False)) == set([]) + assert set(markings.get_markings(data, "x.z.foo1", True, False)) == set(["6", "9", "11"]) + assert set(markings.get_markings(data, "x.z.foo1", True, True)) == set(["6", "9", "11"]) + assert set(markings.get_markings(data, "x.z.foo1", False, True)) == set([]) + + assert set(markings.get_markings(data, "x.z.foo2", False, False)) == set(["10"]) + assert set(markings.get_markings(data, "x.z.foo2", True, False)) == set(["6", "9", "10", "11"]) + assert set(markings.get_markings(data, "x.z.foo2", True, True)) == set(["6", "9", "10", "11"]) + assert set(markings.get_markings(data, "x.z.foo2", False, True)) == set(["10"]) + + +@pytest.mark.parametrize("data", [ + ( + Malware(object_marking_refs=[MARKING_IDS[0]], + **MALWARE_KWARGS), + Malware(**MALWARE_KWARGS), + ), + ( + dict(object_marking_refs=[MARKING_IDS[0]], + **MALWARE_KWARGS), + MALWARE_KWARGS, + ), +]) +def test_remove_markings_object_level(data): + before = data[0] + after = data[1] + + before = markings.remove_markings(before, MARKING_IDS[0], None) + + assert 'object_marking_refs' not in before + assert 'object_marking_refs' not in after + + modified = after['modified'] + after = markings.remove_markings(after, MARKING_IDS[0], None) + modified == after['modified'] + + +@pytest.mark.parametrize("data", [ + ( + Malware(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]], + **MALWARE_KWARGS), + Malware(object_marking_refs=[MARKING_IDS[1]], + **MALWARE_KWARGS), + ), + ( + dict(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]], + **MALWARE_KWARGS), + dict(object_marking_refs=[MARKING_IDS[1]], + **MALWARE_KWARGS), + ), +]) +def test_remove_markings_multiple(data): + before = data[0] + after = data[1] + + before = markings.remove_markings(before, [MARKING_IDS[0], MARKING_IDS[2]], None) + + assert before['object_marking_refs'] == after['object_marking_refs'] + + +def test_remove_markings_bad_markings(): + before = Malware( + object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]], + **MALWARE_KWARGS + ) + with pytest.raises(AssertionError) as excinfo: + markings.remove_markings(before, [MARKING_IDS[4]], None) + assert str(excinfo.value) == "Marking ['%s'] was not found in Malware!" % MARKING_IDS[4] + + +@pytest.mark.parametrize("data", [ + ( + Malware(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]], + **MALWARE_KWARGS), + Malware(**MALWARE_KWARGS), + ), + ( + dict(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]], + **MALWARE_KWARGS), + MALWARE_KWARGS, + ), +]) +def test_clear_markings(data): + before = data[0] + after = data[1] + + before = markings.clear_markings(before, None) + + assert 'object_marking_refs' not in before + assert 'object_marking_refs' not in after + + +def test_is_marked_object_and_granular_combinations(): + """Test multiple combinations for inherited and descendant markings.""" + test_sdo = \ + { + "a": 333, + "b": "value", + "c": [ + 17, + "list value", + { + "g": "nested", + "h": 45 + } + ], + "x": { + "y": [ + "hello", + 88 + ], + "z": { + "foo1": "bar", + "foo2": 65 + } + }, + "object_marking_refs": "11", + "granular_markings": [ + { + "marking_ref": "1", + "selectors": ["a"] + }, + { + "marking_ref": "2", + "selectors": ["c"] + }, + { + "marking_ref": "3", + "selectors": ["c.[1]"] + }, + { + "marking_ref": "4", + "selectors": ["c.[2]"] + }, + { + "marking_ref": "5", + "selectors": ["c.[2].g"] + }, + { + "marking_ref": "6", + "selectors": ["x"] + }, + { + "marking_ref": "7", + "selectors": ["x.y"] + }, + { + "marking_ref": "8", + "selectors": ["x.y.[1]"] + }, + { + "marking_ref": "9", + "selectors": ["x.z"] + }, + { + "marking_ref": "10", + "selectors": ["x.z.foo2"] + }, + ] + } + + assert markings.is_marked(test_sdo, ["1"], "a", False, False) + assert markings.is_marked(test_sdo, ["1", "11"], "a", True, False) + assert markings.is_marked(test_sdo, ["1", "11"], "a", True, True) + assert markings.is_marked(test_sdo, ["1"], "a", False, True) + + assert markings.is_marked(test_sdo, "b", inherited=False, descendants=False) is False + assert markings.is_marked(test_sdo, ["11"], "b", True, False) + assert markings.is_marked(test_sdo, ["11"], "b", True, True) + assert markings.is_marked(test_sdo, "b", inherited=False, descendants=True) is False + + assert markings.is_marked(test_sdo, ["2"], "c", False, False) + assert markings.is_marked(test_sdo, ["2", "11"], "c", True, False) + assert markings.is_marked(test_sdo, ["2", "3", "4", "5", "11"], "c", True, True) + assert markings.is_marked(test_sdo, ["2", "3", "4", "5"], "c", False, True) + + assert markings.is_marked(test_sdo, "c.[0]", inherited=False, descendants=False) is False + assert markings.is_marked(test_sdo, ["2", "11"], "c.[0]", True, False) + assert markings.is_marked(test_sdo, ["2", "11"], "c.[0]", True, True) + assert markings.is_marked(test_sdo, "c.[0]", inherited=False, descendants=True) is False + + assert markings.is_marked(test_sdo, ["3"], "c.[1]", False, False) + assert markings.is_marked(test_sdo, ["2", "3", "11"], "c.[1]", True, False) + assert markings.is_marked(test_sdo, ["2", "3", "11"], "c.[1]", True, True) + assert markings.is_marked(test_sdo, ["3"], "c.[1]", False, True) + + assert markings.is_marked(test_sdo, ["4"], "c.[2]", False, False) + assert markings.is_marked(test_sdo, ["2", "4", "11"], "c.[2]", True, False) + assert markings.is_marked(test_sdo, ["2", "4", "5", "11"], "c.[2]", True, True) + assert markings.is_marked(test_sdo, ["4", "5"], "c.[2]", False, True) + + assert markings.is_marked(test_sdo, ["5"], "c.[2].g", False, False) + assert markings.is_marked(test_sdo, ["2", "4", "5", "11"], "c.[2].g", True, False) + assert markings.is_marked(test_sdo, ["2", "4", "5", "11"], "c.[2].g", True, True) + assert markings.is_marked(test_sdo, ["5"], "c.[2].g", False, True) + + assert markings.is_marked(test_sdo, ["6"], "x", False, False) + assert markings.is_marked(test_sdo, ["6", "11"], "x", True, False) + assert markings.is_marked(test_sdo, ["6", "7", "8", "9", "10", "11"], "x", True, True) + assert markings.is_marked(test_sdo, ["6", "7", "8", "9", "10"], "x", False, True) + + assert markings.is_marked(test_sdo, ["7"], "x.y", False, False) + assert markings.is_marked(test_sdo, ["6", "7", "11"], "x.y", True, False) + assert markings.is_marked(test_sdo, ["6", "7", "8", "11"], "x.y", True, True) + assert markings.is_marked(test_sdo, ["7", "8"], "x.y", False, True) + + assert markings.is_marked(test_sdo, "x.y.[0]", inherited=False, descendants=False) is False + assert markings.is_marked(test_sdo, ["6", "7", "11"], "x.y.[0]", True, False) + assert markings.is_marked(test_sdo, ["6", "7", "11"], "x.y.[0]", True, True) + assert markings.is_marked(test_sdo, "x.y.[0]", inherited=False, descendants=True) is False + + assert markings.is_marked(test_sdo, ["8"], "x.y.[1]", False, False) + assert markings.is_marked(test_sdo, ["6", "7", "8", "11"], "x.y.[1]", True, False) + assert markings.is_marked(test_sdo, ["6", "7", "8", "11"], "x.y.[1]", True, True) + assert markings.is_marked(test_sdo, ["8"], "x.y.[1]", False, True) + + assert markings.is_marked(test_sdo, ["9"], "x.z", False, False) + assert markings.is_marked(test_sdo, ["6", "9", "11"], "x.z", True, False) + assert markings.is_marked(test_sdo, ["6", "9", "10", "11"], "x.z", True, True) + assert markings.is_marked(test_sdo, ["9", "10"], "x.z", False, True) + + assert markings.is_marked(test_sdo, "x.z.foo1", inherited=False, descendants=False) is False + assert markings.is_marked(test_sdo, ["6", "9", "11"], "x.z.foo1", True, False) + assert markings.is_marked(test_sdo, ["6", "9", "11"], "x.z.foo1", True, True) + assert markings.is_marked(test_sdo, "x.z.foo1", inherited=False, descendants=True) is False + + assert markings.is_marked(test_sdo, ["10"], "x.z.foo2", False, False) + assert markings.is_marked(test_sdo, ["6", "9", "10", "11"], "x.z.foo2", True, False) + assert markings.is_marked(test_sdo, ["6", "9", "10", "11"], "x.z.foo2", True, True) + assert markings.is_marked(test_sdo, ["10"], "x.z.foo2", False, True) + + assert markings.is_marked(test_sdo, ["11"], None, True, True) + assert markings.is_marked(test_sdo, ["2"], None, True, True) is False + + +@pytest.mark.parametrize("data", [ + ( + Malware(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]], + **MALWARE_KWARGS), + Malware(**MALWARE_KWARGS), + ), + ( + dict(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]], + **MALWARE_KWARGS), + MALWARE_KWARGS, + ), +]) +def test_is_marked_no_markings(data): + marked = data[0] + nonmarked = data[1] + + assert markings.is_marked(marked) + assert markings.is_marked(nonmarked) is False + + +def test_set_marking(): + before = Malware( + object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]], + **MALWARE_KWARGS + ) + after = Malware( + object_marking_refs=[MARKING_IDS[4], MARKING_IDS[5]], + **MALWARE_KWARGS + ) + + before = markings.set_markings(before, [MARKING_IDS[4], MARKING_IDS[5]], None) + + for m in before["object_marking_refs"]: + assert m in [MARKING_IDS[4], MARKING_IDS[5]] + + assert [MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]] not in before["object_marking_refs"] + + for x in before["object_marking_refs"]: + assert x in after["object_marking_refs"] + + +@pytest.mark.parametrize("data", [ + ([]), + ([""]), + (""), + ([MARKING_IDS[4], 687]) +]) +def test_set_marking_bad_input(data): + before = Malware( + object_marking_refs=[MARKING_IDS[0]], + **MALWARE_KWARGS + ) + after = Malware( + object_marking_refs=[MARKING_IDS[0]], + **MALWARE_KWARGS + ) + with pytest.raises(exceptions.InvalidValueError): + before = markings.set_markings(before, data, None) + + assert before == after diff --git a/stix2/test/test_versioning.py b/stix2/test/test_versioning.py index 453abc0..8695a30 100644 --- a/stix2/test/test_versioning.py +++ b/stix2/test/test_versioning.py @@ -2,16 +2,11 @@ import pytest import stix2 +from .constants import CAMPAIGN_MORE_KWARGS + def test_making_new_version(): - campaign_v1 = stix2.Campaign( - id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", - created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - created="2016-04-06T20:03:00.000Z", - modified="2016-04-06T20:03:00.000Z", - name="Green Group Attacks Against Finance", - description="Campaign by Green Group against a series of targets in the financial services sector." - ) + campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS) campaign_v2 = campaign_v1.new_version(name="fred") @@ -25,14 +20,7 @@ def test_making_new_version(): def test_making_new_version_with_unset(): - campaign_v1 = stix2.Campaign( - id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", - created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - created="2016-04-06T20:03:00.000Z", - modified="2016-04-06T20:03:00.000Z", - name="Green Group Attacks Against Finance", - description="Campaign by Green Group against a series of targets in the financial services sector." - ) + campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS) campaign_v2 = campaign_v1.new_version(description=None) @@ -47,16 +35,11 @@ def test_making_new_version_with_unset(): def test_making_new_version_with_embedded_object(): campaign_v1 = stix2.Campaign( - id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", - created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - created="2016-04-06T20:03:00.000Z", - modified="2016-04-06T20:03:00.000Z", - name="Green Group Attacks Against Finance", external_references=[{ "source_name": "capec", "external_id": "CAPEC-163" }], - description="Campaign by Green Group against a series of targets in the financial services sector." + **CAMPAIGN_MORE_KWARGS ) campaign_v2 = campaign_v1.new_version(external_references=[{ @@ -74,14 +57,7 @@ def test_making_new_version_with_embedded_object(): def test_revoke(): - campaign_v1 = stix2.Campaign( - id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", - created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - created="2016-04-06T20:03:00.000Z", - modified="2016-04-06T20:03:00.000Z", - name="Green Group Attacks Against Finance", - description="Campaign by Green Group against a series of targets in the financial services sector." - ) + campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS) campaign_v2 = campaign_v1.revoke() @@ -96,14 +72,7 @@ def test_revoke(): def test_versioning_error_invalid_property(): - campaign_v1 = stix2.Campaign( - id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", - created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - created="2016-04-06T20:03:00.000Z", - modified="2016-04-06T20:03:00.000Z", - name="Green Group Attacks Against Finance", - description="Campaign by Green Group against a series of targets in the financial services sector." - ) + campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS) with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as excinfo: campaign_v1.new_version(type="threat-actor") @@ -112,14 +81,7 @@ def test_versioning_error_invalid_property(): def test_versioning_error_bad_modified_value(): - campaign_v1 = stix2.Campaign( - id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", - created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - created="2016-04-06T20:03:00.000Z", - modified="2016-04-06T20:03:00.000Z", - name="Green Group Attacks Against Finance", - description="Campaign by Green Group against a series of targets in the financial services sector." - ) + campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS) with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: campaign_v1.new_version(modified="2015-04-06T20:03:00.000Z") @@ -135,14 +97,7 @@ def test_versioning_error_bad_modified_value(): def test_versioning_error_usetting_required_property(): - campaign_v1 = stix2.Campaign( - id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", - created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - created="2016-04-06T20:03:00.000Z", - modified="2016-04-06T20:03:00.000Z", - name="Green Group Attacks Against Finance", - description="Campaign by Green Group against a series of targets in the financial services sector." - ) + campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS) with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo: campaign_v1.new_version(name=None) @@ -156,15 +111,7 @@ def test_versioning_error_usetting_required_property(): def test_versioning_error_new_version_of_revoked(): - campaign_v1 = stix2.Campaign( - id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", - created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - created="2016-04-06T20:03:00.000Z", - modified="2016-04-06T20:03:00.000Z", - name="Green Group Attacks Against Finance", - description="Campaign by Green Group against a series of targets in the financial services sector." - ) - + campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS) campaign_v2 = campaign_v1.revoke() with pytest.raises(stix2.exceptions.RevokeError) as excinfo: @@ -176,15 +123,7 @@ def test_versioning_error_new_version_of_revoked(): def test_versioning_error_revoke_of_revoked(): - campaign_v1 = stix2.Campaign( - id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", - created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", - created="2016-04-06T20:03:00.000Z", - modified="2016-04-06T20:03:00.000Z", - name="Green Group Attacks Against Finance", - description="Campaign by Green Group against a series of targets in the financial services sector." - ) - + campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS) campaign_v2 = campaign_v1.revoke() with pytest.raises(stix2.exceptions.RevokeError) as excinfo: @@ -193,3 +132,77 @@ def test_versioning_error_revoke_of_revoked(): assert excinfo.value.called_by == "revoke" assert str(excinfo.value) == "Cannot revoke an already revoked object." + + +def test_making_new_version_dict(): + campaign_v1 = CAMPAIGN_MORE_KWARGS + campaign_v2 = stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, name="fred") + + assert campaign_v1['id'] == campaign_v2['id'] + assert campaign_v1['created_by_ref'] == campaign_v2['created_by_ref'] + assert campaign_v1['created'] == campaign_v2['created'] + assert campaign_v1['name'] != campaign_v2['name'] + assert campaign_v2['name'] == "fred" + assert campaign_v1['description'] == campaign_v2['description'] + assert stix2.utils.parse_into_datetime(campaign_v1['modified'], precision='millisecond') < campaign_v2['modified'] + + +def test_versioning_error_dict_bad_modified_value(): + with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: + stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, modified="2015-04-06T20:03:00.000Z") + + assert excinfo.value.cls == dict + assert excinfo.value.prop_name == "modified" + assert excinfo.value.reason == "The new modified datetime cannot be before the current modified datatime." + + +def test_versioning_error_dict_no_modified_value(): + campaign_v1 = { + 'type': 'campaign', + 'id': "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + 'created': "2016-04-06T20:03:00.000Z", + 'name': "Green Group Attacks Against Finance", + } + campaign_v2 = stix2.utils.new_version(campaign_v1, modified="2017-04-06T20:03:00.000Z") + + assert str(campaign_v2['modified']) == "2017-04-06T20:03:00.000Z" + + +def test_making_new_version_invalid_cls(): + campaign_v1 = "This is a campaign." + with pytest.raises(ValueError) as excinfo: + stix2.utils.new_version(campaign_v1, name="fred") + + assert 'cannot create new version of object of this type' in str(excinfo.value) + + +def test_revoke_dict(): + campaign_v1 = CAMPAIGN_MORE_KWARGS + campaign_v2 = stix2.utils.revoke(campaign_v1) + + assert campaign_v1['id'] == campaign_v2['id'] + assert campaign_v1['created_by_ref'] == campaign_v2['created_by_ref'] + assert campaign_v1['created'] == campaign_v2['created'] + assert campaign_v1['name'] == campaign_v2['name'] + assert campaign_v1['description'] == campaign_v2['description'] + assert stix2.utils.parse_into_datetime(campaign_v1['modified'], precision='millisecond') < campaign_v2['modified'] + + assert campaign_v2['revoked'] + + +def test_versioning_error_revoke_of_revoked_dict(): + campaign_v1 = CAMPAIGN_MORE_KWARGS + campaign_v2 = stix2.utils.revoke(campaign_v1) + + with pytest.raises(stix2.exceptions.RevokeError) as excinfo: + stix2.utils.revoke(campaign_v2) + + assert excinfo.value.called_by == "revoke" + + +def test_revoke_invalid_cls(): + campaign_v1 = "This is a campaign." + with pytest.raises(ValueError) as excinfo: + stix2.utils.revoke(campaign_v1) + + assert 'cannot revoke object of this type' in str(excinfo.value) diff --git a/stix2/utils.py b/stix2/utils.py index de481fc..ca195f6 100644 --- a/stix2/utils.py +++ b/stix2/utils.py @@ -1,11 +1,16 @@ """Utility functions and classes for the stix2 library.""" +from collections import Mapping +import copy import datetime as dt import json from dateutil import parser import pytz +from .exceptions import (InvalidValueError, RevokeError, + UnmodifiablePropertyError) + # Sentinel value for properties that should be set to the current time. # We can't use the standard 'default' approach, since if there are multiple # timestamps in a single object, the timestamps will vary by a few microseconds. @@ -150,3 +155,51 @@ def find_property_index(obj, properties, tuple_to_find): tuple_to_find) if val is not None: return val + + +def new_version(data, **kwargs): + """Create a new version of a STIX object, by modifying properties and + updating the `modified` property. + """ + + if not isinstance(data, Mapping): + raise ValueError('cannot create new version of object of this type! ' + 'Try a dictionary or instance of an SDO or SRO class.') + + unchangable_properties = [] + if data.get("revoked"): + raise RevokeError("new_version") + try: + new_obj_inner = copy.deepcopy(data._inner) + except AttributeError: + new_obj_inner = copy.deepcopy(data) + properties_to_change = kwargs.keys() + + # Make sure certain properties aren't trying to change + for prop in ["created", "created_by_ref", "id", "type"]: + if prop in properties_to_change: + unchangable_properties.append(prop) + if unchangable_properties: + raise UnmodifiablePropertyError(unchangable_properties) + + cls = type(data) + if 'modified' not in kwargs: + kwargs['modified'] = get_timestamp() + elif 'modified' in data: + old_modified_property = parse_into_datetime(data.get('modified'), precision='millisecond') + new_modified_property = parse_into_datetime(kwargs['modified'], precision='millisecond') + if new_modified_property < old_modified_property: + raise InvalidValueError(cls, 'modified', "The new modified datetime cannot be before the current modified datatime.") + new_obj_inner.update(kwargs) + # Exclude properties with a value of 'None' in case data is not an instance of a _STIXBase subclass + return cls(**{k: v for k, v in new_obj_inner.items() if v is not None}) + + +def revoke(data): + if not isinstance(data, Mapping): + raise ValueError('cannot revoke object of this type! Try a dictionary ' + 'or instance of an SDO or SRO class.') + + if data.get("revoked"): + raise RevokeError("revoke") + return new_version(data, revoked=True) diff --git a/tox.ini b/tox.ini index b1265ec..bca18c3 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py26,py27,py33,py34,py35,py36,pycodestyle,isort-check +envlist = py27,py33,py34,py35,py36,pycodestyle,isort-check [testenv] deps = @@ -36,7 +36,6 @@ commands = [travis] python = - 2.6: py26 2.7: py27, pycodestyle 3.3: py33, pycodestyle 3.4: py34, pycodestyle