diff --git a/stix2/markings/__init__.py b/stix2/markings/__init__.py index e69de29..8301131 100644 --- a/stix2/markings/__init__.py +++ b/stix2/markings/__init__.py @@ -0,0 +1,197 @@ +""" +Python STIX 2.0 Data Markings API. + +These high level functions will operate on both object level markings and +granular markings unless otherwise noted in each of the functions. +""" + +from stix2.markings import utils +from stix2.markings import granular_markings +from stix2.markings import object_markings + + +def get_markings(obj, selectors, inherited=False, descendants=False): + """ + Get all markings associated to the field(s). + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + inherited: If True, include object level markings and granular markings + inherited relative to the field(s). + descendants: If True, include granular markings applied to any children + relative to the field(s). + + Returns: + list: Marking IDs that matched the selectors expression. + + Note: + If ``selectors`` is None, operation will be performed only on object + level markings. + + """ + if selectors is None: + return object_markings.get_markings(obj) + + results = granular_markings.get_markings( + obj, + selectors, + inherited, + descendants + ) + + if inherited: + results.extend(object_markings.get_markings(obj)) + + return list(set(results)) + + +def set_markings(obj, selectors, marking): + """ + Removes all markings associated with selectors and appends a new granular + marking. Refer to `clear_markings` and `add_markings` for details. + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + marking: identifier or list of marking identifiers that apply to the + field(s) selected by `selectors`. + + Note: + If ``selectors`` is None, operations will be performed on object level + markings. Otherwise on granular markings. + + """ + if selectors is None: + object_markings.set_markings(obj, marking) + else: + granular_markings.set_markings(obj, selectors, marking) + + +def remove_markings(obj, selectors, marking): + """ + Removes granular_marking from the granular_markings collection. + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + marking: identifier or list of marking identifiers that apply to the + field(s) selected by `selectors`. + + Raises: + AssertionError: If `selectors` or `marking` fail data validation. Also + if markings to remove are not found on the provided TLO. + + Note: + If ``selectors`` is None, operations will be performed on object level + markings. Otherwise on granular markings. + + """ + if selectors is None: + object_markings.remove_markings(obj, marking) + else: + granular_markings.remove_markings(obj, selectors, marking) + + +def add_markings(obj, selectors, marking): + """ + Appends a granular_marking to the granular_markings collection. + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + marking: identifier or list of marking identifiers that apply to the + field(s) selected by `selectors`. + + Raises: + AssertionError: If `selectors` or `marking` fail data validation. + + Note: + If ``selectors`` is None, operations will be performed on object level + markings. Otherwise on granular markings. + + """ + if selectors is None: + object_markings.add_markings(obj, marking) + else: + granular_markings.add_markings(obj, selectors, marking) + + +def clear_markings(obj, selectors): + """ + Removes all granular_marking associated with the selectors. + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + + Note: + If ``selectors`` is None, operations will be performed on object level + markings. Otherwise on granular markings. + + """ + if selectors is None: + object_markings.clear_markings(obj) + else: + granular_markings.clear_markings(obj, selectors) + + +def is_marked(obj, selectors, marking=None, inherited=False, descendants=False): + """ + Checks if field(s) is marked by any marking or by specific marking(s). + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + marking: identifier or list of marking identifiers that apply to the + field(s) selected by `selectors`. + inherited: If True, include object level markings and granular markings + inherited to determine if the field(s) is/are marked. + descendants: If True, include granular markings applied to any children + of the given selector to determine if the field(s) is/are marked. + + Returns: + bool: True if ``selectors`` is found on internal TLO collection. + False otherwise. + + Note: + When a list of marking IDs is provided, if ANY of the provided marking + IDs matches, True is returned. + + If ``selectors`` is None, operation will be performed only on object + level markings. + + """ + if selectors is None: + return object_markings.is_marked(obj, marking) + + result = granular_markings.is_marked( + obj, + selectors, + marking, + inherited, + descendants + ) + + if inherited: + granular_marks = granular_markings.get_markings(obj, selectors) + object_marks = object_markings.get_markings(obj) + + if granular_marks: + result = granular_markings.is_marked( + obj, + selectors, + granular_marks, + inherited, + descendants + ) + + result = result or object_markings.is_marked(obj, object_marks) + + return result diff --git a/stix2/markings/granular_markings.py b/stix2/markings/granular_markings.py index e69de29..6d69f16 100644 --- a/stix2/markings/granular_markings.py +++ b/stix2/markings/granular_markings.py @@ -0,0 +1,237 @@ + +from stix2.markings import utils + + +def get_markings(obj, selectors, inherited=False, descendants=False): + """ + Get all markings associated to the field(s). + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + inherited: If True, include markings inherited relative to the field(s). + descendants: If True, include granular markings applied to any children + relative to the field(s). + + Returns: + list: Marking IDs that matched the selectors expression. + + """ + selectors = utils.fix_value(selectors) + utils.validate(obj, selectors) + + granular_markings = obj.get("granular_markings", []) + + if not granular_markings: + return [] + + results = set() + + for marking in granular_markings: + for user_selector in selectors: + for marking_selector in marking.get("selectors", []): + if any([(user_selector == marking_selector), # Catch explicit selectors. + (user_selector.startswith(marking_selector) and inherited), # Catch inherited selectors. + (marking_selector.startswith(user_selector) and descendants)]): # Catch descendants selectors + refs = marking.get("marking_ref", []) + results.update([refs]) + + return list(results) + + +def set_markings(obj, selectors, marking): + """ + Removes all markings associated with selectors and appends a new granular + marking. Refer to `clear_markings` and `add_markings` for details. + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + marking: identifier or list of marking identifiers that apply to the + field(s) selected by `selectors`. + + """ + clear_markings(obj, selectors) + add_markings(obj, selectors, marking) + + +def remove_markings(obj, selectors, marking): + """ + Removes granular_marking from the granular_markings collection. + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + marking: identifier or list of marking identifiers that apply to the + field(s) selected by `selectors`. + + Raises: + AssertionError: If `selectors` or `marking` fail data validation. Also + if markings to remove are not found on the provided TLO. + + """ + selectors = utils.fix_value(selectors) + utils.validate(obj, selectors, marking) + + utils.expand_markings(obj) + + granular_markings = obj.get("granular_markings") + + if not granular_markings: + return + + tlo = utils.build_granular_marking( + {"selectors": selectors, "marking_ref": marking} + ) + + remove = tlo.get("granular_markings", []) + + if not any(marking in granular_markings for marking in remove): + raise AssertionError("Unable to remove Granular Marking(s) from" + " internal collection. Marking(s) not found...") + + obj["granular_markings"] = [ + m for m in granular_markings if m not in remove + ] + + utils.compress_markings(obj) + + if not obj.get("granular_markings"): + obj.pop("granular_markings") + + +def add_markings(obj, selectors, marking): + """ + Appends a granular_marking to the granular_markings collection. + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + marking: identifier or list of marking identifiers that apply to the + field(s) selected by `selectors`. + + Raises: + AssertionError: If `selectors` or `marking` fail data validation. + + """ + selectors = utils.fix_value(selectors) + utils.validate(obj, selectors, marking) + + granular_marking = {"selectors": sorted(selectors), "marking_ref": marking} + + if not obj.get("granular_markings"): + obj["granular_markings"] = list() + + obj["granular_markings"].append(granular_marking) + + utils.expand_markings(obj) + utils.compress_markings(obj) + + +def clear_markings(obj, selectors): + """ + Removes all granular_marking associated with the selectors. + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + + Raises: + AssertionError: If `selectors` or `marking` fail data validation. Also + if markings to remove are not found on the provided TLO. + + """ + selectors = utils.fix_value(selectors) + utils.validate(obj, selectors) + + utils.expand_markings(obj) + + granular_markings = obj.get("granular_markings") + + if not granular_markings: + return + + tlo = utils.build_granular_marking( + {"selectors": selectors, "marking_ref": ["N/A"]} + ) + + clear = tlo.get("granular_markings", []) + + if not any(clear_selector in tlo_selectors.get("selectors", []) + for tlo_selectors in granular_markings + for clear_marking in clear + for clear_selector in clear_marking.get("selectors", []) + ): + raise AssertionError("Unable to clear Granular Marking(s) from" + " internal collection. Selector(s) not found...") + + for granular_marking in granular_markings: + for s in selectors: + if s in granular_marking.get("selectors", []): + marking_refs = granular_marking.get("marking_ref") + + if marking_refs: + granular_marking["marking_ref"] = list() + + utils.compress_markings(obj) + + if not obj.get("granular_markings"): + obj.pop("granular_markings") + + +def is_marked(obj, selectors, marking=None, inherited=False, descendants=False): + """ + Checks if field is marked by any marking or by specific marking(s). + + Args: + obj: A TLO object. + selectors: string or list of selectors strings relative to the TLO in + which the field(s) appear(s). + marking: identifier or list of marking identifiers that apply to the + field(s) selected by `selectors`. + inherited: If True, return markings inherited from the given selector. + descendants: If True, return granular markings applied to any children + of the given selector. + + Returns: + bool: True if ``selectors`` is found on internal TLO collection. + False otherwise. + + Note: + When a list of marking IDs is provided, if ANY of the provided marking + IDs matches, True is returned. + + """ + selectors = utils.fix_value(selectors) + marking = utils.fix_value(marking) + utils.validate(obj, selectors, marking) + + granular_markings = obj.get("granular_markings", []) + + marked = False + markings = set() + + for granular_marking in granular_markings: + for user_selector in selectors: + for marking_selector in granular_marking.get("selectors", []): + + if any([(user_selector == marking_selector), # Catch explicit selectors. + (user_selector.startswith(marking_selector) and inherited), # Catch inherited selectors. + (marking_selector.startswith(user_selector) and descendants)]): # Catch descendants selectors + marking_ref = granular_marking.get("marking_ref", "") + + if marking and any(x == marking_ref for x in marking): + markings.update([marking_ref]) + + marked = True + + if marking: + # All user-provided markings must be found. + return markings.issuperset(set(marking)) + + return marked diff --git a/stix2/markings/object_markings.py b/stix2/markings/object_markings.py index e69de29..33a8d2a 100644 --- a/stix2/markings/object_markings.py +++ b/stix2/markings/object_markings.py @@ -0,0 +1,140 @@ + +import six + +from stix2.markings import utils + + +def get_markings(obj): + """ + Get all object level markings from the given TLO object. + + Args: + obj: A TLO object. + + Returns: + list: Marking IDs contained in the TLO. + + """ + object_markings = obj.get("object_marking_refs", []) + + if not object_markings: + return [] + elif isinstance(object_markings, six.string_types): + return [object_markings] + else: + return object_markings + + +def add_markings(obj, marking): + """ + Appends an object level marking to the object_marking_refs collection. + + Args: + obj: A TLO object. + marking: identifier or list of marking identifiers that apply to the + TLO object. + + Raises: + AssertionError: If `marking` fail data validation. + + """ + marking = utils.convert_to_list(marking) + utils.validate(obj, marking=marking) + + if not obj.get("object_marking_refs"): + obj["object_marking_refs"] = list() + + object_markings = set(obj.get("object_marking_refs") + marking) + + obj["object_marking_refs"] = list(object_markings) + + +def remove_markings(obj, marking): + """ + Removes object level marking from the object_marking_refs collection. + + Args: + obj: A TLO object. + marking: identifier or list of marking identifiers that apply to the + TLO object. + + Raises: + AssertionError: If `marking` fail data validation. Also + if markings to remove are not found on the provided TLO. + + """ + marking = utils.convert_to_list(marking) + utils.validate(obj, marking=marking) + + object_markings = obj.get("object_marking_refs", []) + + if not object_markings: + return [] + + if any(x not in obj["object_marking_refs"] for x in marking): + raise AssertionError("Unable to remove Object Level Marking(s) from " + "internal collection. Marking(s) not found...") + + obj["object_marking_refs"] = [x for x in object_markings + if x not in marking] + + if not obj.get("object_marking_refs"): + obj.pop("object_marking_refs") + + +def set_markings(obj, marking): + """ + Removes all object level markings and appends new object level markings to + the collection. Refer to `clear_markings` and `add_markings` for details. + + Args: + obj: A TLO object. + marking: identifier or list of marking identifiers that apply to the + TLO object. + + """ + utils.validate(obj, marking=marking) + + clear_markings(obj) + add_markings(obj, marking) + + +def clear_markings(obj): + """ + Removes all object level markings from the object_marking_refs collection. + + Args: + obj: A TLO object. + + """ + try: + del obj["object_marking_refs"] + except KeyError: + raise AssertionError("Unable to clear Object Marking(s) from internal" + " collection. No Markings in object...") + + +def is_marked(obj, marking=None): + """ + Checks if TLO is marked by any marking or by specific marking(s). + + Args: + obj: A TLO object. + marking: identifier or list of marking identifiers that apply to the + TLO object. + + Returns: + bool: True if TLO has object level markings. False otherwise. + + Note: + When a list of marking IDs is provided, if ANY of the provided marking + IDs matches, True is returned. + + """ + marking = utils.convert_to_list(marking) + object_markings = obj.get("object_marking_refs", []) + + if marking: + return any(x in object_markings for x in marking) + else: + return bool(object_markings) diff --git a/stix2/markings/utils.py b/stix2/markings/utils.py index e69de29..9286695 100644 --- a/stix2/markings/utils.py +++ b/stix2/markings/utils.py @@ -0,0 +1,228 @@ + +import collections + +import six + + +def evaluate_expression(obj, selector): + + for items, value in iterpath(obj): + path = ".".join(items) + + if path == selector and value: + return [value] + + return [] + + +def validate_selector(obj, selector): + results = list(evaluate_expression(obj, selector)) + + if len(results) >= 1: + return True + + +def validate_markings(marking): + if isinstance(marking, six.string_types): + if not marking: + return False + else: + return True + + elif isinstance(marking, list) and len(marking) >= 1: + for m in marking: + if not m: + return False + elif not isinstance(m, six.string_types): + return False + + return True + else: + return False + + +def validate(obj, selectors=None, marking=None): + + if selectors is not None: + assert selectors + + for s in selectors: + assert validate_selector(obj, s) + + if marking is not None: + assert validate_markings(marking) + + +def convert_to_list(data): + if data is not None: + if isinstance(data, list): + return data + else: + return [data] + + +def fix_value(data): + data = convert_to_list(data) + + return data + + +def _fix_markings(markings): + + for granular_marking in markings: + refs = granular_marking.get("marking_ref", []) + selectors = granular_marking.get("selectors", []) + + if not isinstance(refs, list): + granular_marking["marking_ref"] = [refs] + + if not isinstance(selectors, list): + granular_marking["selectors"] = [selectors] + + +def _group_by(markings): + + key = "marking_ref" + retrieve = "selectors" + + map_ = collections.defaultdict(set) + + for granular_marking in markings: + for data in granular_marking.get(key, []): + map_[data].update(granular_marking.get(retrieve)) + + granular_markings = \ + [ + {"selectors": sorted(selectors), "marking_ref": ref} + for ref, selectors in six.iteritems(map_) + ] + + return granular_markings + + +def compress_markings(tlo): + + if not tlo.get("granular_markings"): + return + + granular_markings = tlo.get("granular_markings") + + _fix_markings(granular_markings) + + tlo["granular_markings"] = _group_by(granular_markings) + + +def expand_markings(tlo): + + if not tlo.get("granular_markings"): + return + + granular_markings = tlo.get("granular_markings") + + _fix_markings(granular_markings) + + expanded = list() + + for marking in granular_markings: + selectors = marking.get("selectors", []) + marking_ref = marking.get("marking_ref", []) + + expanded.extend( + [ + {"selectors": [sel], "marking_ref": ref} + for sel in selectors + for ref in marking_ref + ] + ) + + tlo["granular_markings"] = expanded + + +def build_granular_marking(granular_marking): + tlo = {"granular_markings": [granular_marking]} + + expand_markings(tlo) + + return tlo + + +def iterpath(obj, path=None): + """ + Generator which walks the input ``obj`` model. Each iteration yields a + tuple containing a list of ancestors and the property value. + + Args: + obj: A TLO object. + path: None, used recursively to store ancestors. + + Example: + >>> for item in iterpath(tlo): + >>> print(item) + (['type'], 'campaign') + ... + (['cybox', 'objects', '[0]', 'hashes', 'sha1'], 'cac35ec206d868b7d7cb0b55f31d9425b075082b') + + Returns: + tuple: Containing two items: a list of ancestors and the property value. + + """ + if path is None: + path = [] + + for varname, varobj in iter(sorted(six.iteritems(obj))): + path.append(varname) + yield (path, varobj) + + if isinstance(varobj, dict): + + for item in iterpath(varobj, path): + yield item + + elif isinstance(varobj, list): + + for item in varobj: + index = "[{0}]".format(varobj.index(item)) + path.append(index) + + yield (path, item) + + if isinstance(item, dict): + for descendant in iterpath(item, path): + yield descendant + + path.pop() + + path.pop() + + +def get_selector(obj, prop): + """ + Function that creates a selector based on ``prop``. + + Args: + obj: A TLO object. + prop: A property of the TLO object. + + Note: + Must supply the actual value inside the structure. Since some + limitations exist with Python interning methods, checking for object + location is for now the option to assert the data. + + Example: + >>> selector = get_selector(tlo, tlo["cybox"]["objects"][0]["file_name"]) + >>> print(selector) + ["cybox.objects.[0].file_name"] + + Returns: + list: A list with one selector that asserts the supplied property. + Empty list if it was unable to find the property. + + """ + selector = [] + + for ancestors, value in iterpath(obj): + if value is prop: + path = ".".join(ancestors) + selector.append(path) + + return selector