Initial marking code.

stix2.1
Emmanuelle Vargas-Gonzalez 2017-06-09 14:21:42 -04:00
parent 069c82abf1
commit 1b7695c4f6
4 changed files with 802 additions and 0 deletions

View File

@ -0,0 +1,197 @@
"""
Python STIX 2.0 Data Markings API.
These high level functions will operate on both object level markings and
granular markings unless otherwise noted in each of the functions.
"""
from stix2.markings import utils
from stix2.markings import granular_markings
from stix2.markings import object_markings
def get_markings(obj, selectors, inherited=False, descendants=False):
"""
Get all markings associated to the field(s).
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
inherited: If True, include object level markings and granular markings
inherited relative to the field(s).
descendants: If True, include granular markings applied to any children
relative to the field(s).
Returns:
list: Marking IDs that matched the selectors expression.
Note:
If ``selectors`` is None, operation will be performed only on object
level markings.
"""
if selectors is None:
return object_markings.get_markings(obj)
results = granular_markings.get_markings(
obj,
selectors,
inherited,
descendants
)
if inherited:
results.extend(object_markings.get_markings(obj))
return list(set(results))
def set_markings(obj, selectors, marking):
"""
Removes all markings associated with selectors and appends a new granular
marking. Refer to `clear_markings` and `add_markings` for details.
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
marking: identifier or list of marking identifiers that apply to the
field(s) selected by `selectors`.
Note:
If ``selectors`` is None, operations will be performed on object level
markings. Otherwise on granular markings.
"""
if selectors is None:
object_markings.set_markings(obj, marking)
else:
granular_markings.set_markings(obj, selectors, marking)
def remove_markings(obj, selectors, marking):
"""
Removes granular_marking from the granular_markings collection.
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
marking: identifier or list of marking identifiers that apply to the
field(s) selected by `selectors`.
Raises:
AssertionError: If `selectors` or `marking` fail data validation. Also
if markings to remove are not found on the provided TLO.
Note:
If ``selectors`` is None, operations will be performed on object level
markings. Otherwise on granular markings.
"""
if selectors is None:
object_markings.remove_markings(obj, marking)
else:
granular_markings.remove_markings(obj, selectors, marking)
def add_markings(obj, selectors, marking):
"""
Appends a granular_marking to the granular_markings collection.
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
marking: identifier or list of marking identifiers that apply to the
field(s) selected by `selectors`.
Raises:
AssertionError: If `selectors` or `marking` fail data validation.
Note:
If ``selectors`` is None, operations will be performed on object level
markings. Otherwise on granular markings.
"""
if selectors is None:
object_markings.add_markings(obj, marking)
else:
granular_markings.add_markings(obj, selectors, marking)
def clear_markings(obj, selectors):
"""
Removes all granular_marking associated with the selectors.
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
Note:
If ``selectors`` is None, operations will be performed on object level
markings. Otherwise on granular markings.
"""
if selectors is None:
object_markings.clear_markings(obj)
else:
granular_markings.clear_markings(obj, selectors)
def is_marked(obj, selectors, marking=None, inherited=False, descendants=False):
"""
Checks if field(s) is marked by any marking or by specific marking(s).
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
marking: identifier or list of marking identifiers that apply to the
field(s) selected by `selectors`.
inherited: If True, include object level markings and granular markings
inherited to determine if the field(s) is/are marked.
descendants: If True, include granular markings applied to any children
of the given selector to determine if the field(s) is/are marked.
Returns:
bool: True if ``selectors`` is found on internal TLO collection.
False otherwise.
Note:
When a list of marking IDs is provided, if ANY of the provided marking
IDs matches, True is returned.
If ``selectors`` is None, operation will be performed only on object
level markings.
"""
if selectors is None:
return object_markings.is_marked(obj, marking)
result = granular_markings.is_marked(
obj,
selectors,
marking,
inherited,
descendants
)
if inherited:
granular_marks = granular_markings.get_markings(obj, selectors)
object_marks = object_markings.get_markings(obj)
if granular_marks:
result = granular_markings.is_marked(
obj,
selectors,
granular_marks,
inherited,
descendants
)
result = result or object_markings.is_marked(obj, object_marks)
return result

View File

@ -0,0 +1,237 @@
from stix2.markings import utils
def get_markings(obj, selectors, inherited=False, descendants=False):
"""
Get all markings associated to the field(s).
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
inherited: If True, include markings inherited relative to the field(s).
descendants: If True, include granular markings applied to any children
relative to the field(s).
Returns:
list: Marking IDs that matched the selectors expression.
"""
selectors = utils.fix_value(selectors)
utils.validate(obj, selectors)
granular_markings = obj.get("granular_markings", [])
if not granular_markings:
return []
results = set()
for marking in granular_markings:
for user_selector in selectors:
for marking_selector in marking.get("selectors", []):
if any([(user_selector == marking_selector), # Catch explicit selectors.
(user_selector.startswith(marking_selector) and inherited), # Catch inherited selectors.
(marking_selector.startswith(user_selector) and descendants)]): # Catch descendants selectors
refs = marking.get("marking_ref", [])
results.update([refs])
return list(results)
def set_markings(obj, selectors, marking):
"""
Removes all markings associated with selectors and appends a new granular
marking. Refer to `clear_markings` and `add_markings` for details.
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
marking: identifier or list of marking identifiers that apply to the
field(s) selected by `selectors`.
"""
clear_markings(obj, selectors)
add_markings(obj, selectors, marking)
def remove_markings(obj, selectors, marking):
"""
Removes granular_marking from the granular_markings collection.
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
marking: identifier or list of marking identifiers that apply to the
field(s) selected by `selectors`.
Raises:
AssertionError: If `selectors` or `marking` fail data validation. Also
if markings to remove are not found on the provided TLO.
"""
selectors = utils.fix_value(selectors)
utils.validate(obj, selectors, marking)
utils.expand_markings(obj)
granular_markings = obj.get("granular_markings")
if not granular_markings:
return
tlo = utils.build_granular_marking(
{"selectors": selectors, "marking_ref": marking}
)
remove = tlo.get("granular_markings", [])
if not any(marking in granular_markings for marking in remove):
raise AssertionError("Unable to remove Granular Marking(s) from"
" internal collection. Marking(s) not found...")
obj["granular_markings"] = [
m for m in granular_markings if m not in remove
]
utils.compress_markings(obj)
if not obj.get("granular_markings"):
obj.pop("granular_markings")
def add_markings(obj, selectors, marking):
"""
Appends a granular_marking to the granular_markings collection.
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
marking: identifier or list of marking identifiers that apply to the
field(s) selected by `selectors`.
Raises:
AssertionError: If `selectors` or `marking` fail data validation.
"""
selectors = utils.fix_value(selectors)
utils.validate(obj, selectors, marking)
granular_marking = {"selectors": sorted(selectors), "marking_ref": marking}
if not obj.get("granular_markings"):
obj["granular_markings"] = list()
obj["granular_markings"].append(granular_marking)
utils.expand_markings(obj)
utils.compress_markings(obj)
def clear_markings(obj, selectors):
"""
Removes all granular_marking associated with the selectors.
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
Raises:
AssertionError: If `selectors` or `marking` fail data validation. Also
if markings to remove are not found on the provided TLO.
"""
selectors = utils.fix_value(selectors)
utils.validate(obj, selectors)
utils.expand_markings(obj)
granular_markings = obj.get("granular_markings")
if not granular_markings:
return
tlo = utils.build_granular_marking(
{"selectors": selectors, "marking_ref": ["N/A"]}
)
clear = tlo.get("granular_markings", [])
if not any(clear_selector in tlo_selectors.get("selectors", [])
for tlo_selectors in granular_markings
for clear_marking in clear
for clear_selector in clear_marking.get("selectors", [])
):
raise AssertionError("Unable to clear Granular Marking(s) from"
" internal collection. Selector(s) not found...")
for granular_marking in granular_markings:
for s in selectors:
if s in granular_marking.get("selectors", []):
marking_refs = granular_marking.get("marking_ref")
if marking_refs:
granular_marking["marking_ref"] = list()
utils.compress_markings(obj)
if not obj.get("granular_markings"):
obj.pop("granular_markings")
def is_marked(obj, selectors, marking=None, inherited=False, descendants=False):
"""
Checks if field is marked by any marking or by specific marking(s).
Args:
obj: A TLO object.
selectors: string or list of selectors strings relative to the TLO in
which the field(s) appear(s).
marking: identifier or list of marking identifiers that apply to the
field(s) selected by `selectors`.
inherited: If True, return markings inherited from the given selector.
descendants: If True, return granular markings applied to any children
of the given selector.
Returns:
bool: True if ``selectors`` is found on internal TLO collection.
False otherwise.
Note:
When a list of marking IDs is provided, if ANY of the provided marking
IDs matches, True is returned.
"""
selectors = utils.fix_value(selectors)
marking = utils.fix_value(marking)
utils.validate(obj, selectors, marking)
granular_markings = obj.get("granular_markings", [])
marked = False
markings = set()
for granular_marking in granular_markings:
for user_selector in selectors:
for marking_selector in granular_marking.get("selectors", []):
if any([(user_selector == marking_selector), # Catch explicit selectors.
(user_selector.startswith(marking_selector) and inherited), # Catch inherited selectors.
(marking_selector.startswith(user_selector) and descendants)]): # Catch descendants selectors
marking_ref = granular_marking.get("marking_ref", "")
if marking and any(x == marking_ref for x in marking):
markings.update([marking_ref])
marked = True
if marking:
# All user-provided markings must be found.
return markings.issuperset(set(marking))
return marked

View File

@ -0,0 +1,140 @@
import six
from stix2.markings import utils
def get_markings(obj):
"""
Get all object level markings from the given TLO object.
Args:
obj: A TLO object.
Returns:
list: Marking IDs contained in the TLO.
"""
object_markings = obj.get("object_marking_refs", [])
if not object_markings:
return []
elif isinstance(object_markings, six.string_types):
return [object_markings]
else:
return object_markings
def add_markings(obj, marking):
"""
Appends an object level marking to the object_marking_refs collection.
Args:
obj: A TLO object.
marking: identifier or list of marking identifiers that apply to the
TLO object.
Raises:
AssertionError: If `marking` fail data validation.
"""
marking = utils.convert_to_list(marking)
utils.validate(obj, marking=marking)
if not obj.get("object_marking_refs"):
obj["object_marking_refs"] = list()
object_markings = set(obj.get("object_marking_refs") + marking)
obj["object_marking_refs"] = list(object_markings)
def remove_markings(obj, marking):
"""
Removes object level marking from the object_marking_refs collection.
Args:
obj: A TLO object.
marking: identifier or list of marking identifiers that apply to the
TLO object.
Raises:
AssertionError: If `marking` fail data validation. Also
if markings to remove are not found on the provided TLO.
"""
marking = utils.convert_to_list(marking)
utils.validate(obj, marking=marking)
object_markings = obj.get("object_marking_refs", [])
if not object_markings:
return []
if any(x not in obj["object_marking_refs"] for x in marking):
raise AssertionError("Unable to remove Object Level Marking(s) from "
"internal collection. Marking(s) not found...")
obj["object_marking_refs"] = [x for x in object_markings
if x not in marking]
if not obj.get("object_marking_refs"):
obj.pop("object_marking_refs")
def set_markings(obj, marking):
"""
Removes all object level markings and appends new object level markings to
the collection. Refer to `clear_markings` and `add_markings` for details.
Args:
obj: A TLO object.
marking: identifier or list of marking identifiers that apply to the
TLO object.
"""
utils.validate(obj, marking=marking)
clear_markings(obj)
add_markings(obj, marking)
def clear_markings(obj):
"""
Removes all object level markings from the object_marking_refs collection.
Args:
obj: A TLO object.
"""
try:
del obj["object_marking_refs"]
except KeyError:
raise AssertionError("Unable to clear Object Marking(s) from internal"
" collection. No Markings in object...")
def is_marked(obj, marking=None):
"""
Checks if TLO is marked by any marking or by specific marking(s).
Args:
obj: A TLO object.
marking: identifier or list of marking identifiers that apply to the
TLO object.
Returns:
bool: True if TLO has object level markings. False otherwise.
Note:
When a list of marking IDs is provided, if ANY of the provided marking
IDs matches, True is returned.
"""
marking = utils.convert_to_list(marking)
object_markings = obj.get("object_marking_refs", [])
if marking:
return any(x in object_markings for x in marking)
else:
return bool(object_markings)

View File

@ -0,0 +1,228 @@
import collections
import six
def evaluate_expression(obj, selector):
for items, value in iterpath(obj):
path = ".".join(items)
if path == selector and value:
return [value]
return []
def validate_selector(obj, selector):
results = list(evaluate_expression(obj, selector))
if len(results) >= 1:
return True
def validate_markings(marking):
if isinstance(marking, six.string_types):
if not marking:
return False
else:
return True
elif isinstance(marking, list) and len(marking) >= 1:
for m in marking:
if not m:
return False
elif not isinstance(m, six.string_types):
return False
return True
else:
return False
def validate(obj, selectors=None, marking=None):
if selectors is not None:
assert selectors
for s in selectors:
assert validate_selector(obj, s)
if marking is not None:
assert validate_markings(marking)
def convert_to_list(data):
if data is not None:
if isinstance(data, list):
return data
else:
return [data]
def fix_value(data):
data = convert_to_list(data)
return data
def _fix_markings(markings):
for granular_marking in markings:
refs = granular_marking.get("marking_ref", [])
selectors = granular_marking.get("selectors", [])
if not isinstance(refs, list):
granular_marking["marking_ref"] = [refs]
if not isinstance(selectors, list):
granular_marking["selectors"] = [selectors]
def _group_by(markings):
key = "marking_ref"
retrieve = "selectors"
map_ = collections.defaultdict(set)
for granular_marking in markings:
for data in granular_marking.get(key, []):
map_[data].update(granular_marking.get(retrieve))
granular_markings = \
[
{"selectors": sorted(selectors), "marking_ref": ref}
for ref, selectors in six.iteritems(map_)
]
return granular_markings
def compress_markings(tlo):
if not tlo.get("granular_markings"):
return
granular_markings = tlo.get("granular_markings")
_fix_markings(granular_markings)
tlo["granular_markings"] = _group_by(granular_markings)
def expand_markings(tlo):
if not tlo.get("granular_markings"):
return
granular_markings = tlo.get("granular_markings")
_fix_markings(granular_markings)
expanded = list()
for marking in granular_markings:
selectors = marking.get("selectors", [])
marking_ref = marking.get("marking_ref", [])
expanded.extend(
[
{"selectors": [sel], "marking_ref": ref}
for sel in selectors
for ref in marking_ref
]
)
tlo["granular_markings"] = expanded
def build_granular_marking(granular_marking):
tlo = {"granular_markings": [granular_marking]}
expand_markings(tlo)
return tlo
def iterpath(obj, path=None):
"""
Generator which walks the input ``obj`` model. Each iteration yields a
tuple containing a list of ancestors and the property value.
Args:
obj: A TLO object.
path: None, used recursively to store ancestors.
Example:
>>> for item in iterpath(tlo):
>>> print(item)
(['type'], 'campaign')
...
(['cybox', 'objects', '[0]', 'hashes', 'sha1'], 'cac35ec206d868b7d7cb0b55f31d9425b075082b')
Returns:
tuple: Containing two items: a list of ancestors and the property value.
"""
if path is None:
path = []
for varname, varobj in iter(sorted(six.iteritems(obj))):
path.append(varname)
yield (path, varobj)
if isinstance(varobj, dict):
for item in iterpath(varobj, path):
yield item
elif isinstance(varobj, list):
for item in varobj:
index = "[{0}]".format(varobj.index(item))
path.append(index)
yield (path, item)
if isinstance(item, dict):
for descendant in iterpath(item, path):
yield descendant
path.pop()
path.pop()
def get_selector(obj, prop):
"""
Function that creates a selector based on ``prop``.
Args:
obj: A TLO object.
prop: A property of the TLO object.
Note:
Must supply the actual value inside the structure. Since some
limitations exist with Python interning methods, checking for object
location is for now the option to assert the data.
Example:
>>> selector = get_selector(tlo, tlo["cybox"]["objects"][0]["file_name"])
>>> print(selector)
["cybox.objects.[0].file_name"]
Returns:
list: A list with one selector that asserts the supplied property.
Empty list if it was unable to find the property.
"""
selector = []
for ancestors, value in iterpath(obj):
if value is prop:
path = ".".join(ancestors)
selector.append(path)
return selector