Merge pull request #46 from oasis-open/markings

Marking Support
stix2.1
Greg Back 2017-09-05 19:08:45 +00:00 committed by GitHub
commit adac43708b
22 changed files with 2618 additions and 126 deletions

View File

@ -2,7 +2,6 @@ sudo: false
language: python
cache: pip
python:
- "2.6"
- "2.7"
- "3.3"
- "3.4"

View File

@ -47,7 +47,6 @@ setup(
keywords="stix stix2 json cti cyber threat intelligence",
packages=find_packages(),
install_requires=[
'ordereddict ; python_version<"2.7"',
'python-dateutil',
'pytz',
'requests',

View File

@ -42,5 +42,5 @@ from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject,
Identity, Indicator, IntrusionSet, Malware, ObservedData,
Report, ThreatActor, Tool, Vulnerability)
from .sro import Relationship, Sighting
from .utils import get_dict
from .utils import get_dict, new_version, revoke
from .version import __version__

View File

@ -10,10 +10,11 @@ from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError,
ExtraPropertiesError, ImmutableError,
InvalidObjRefError, InvalidValueError,
MissingPropertiesError,
MutuallyExclusivePropertiesError, RevokeError,
UnmodifiablePropertyError)
from .utils import (NOW, find_property_index, format_datetime, get_timestamp,
parse_into_datetime)
MutuallyExclusivePropertiesError)
from .markings.utils import validate
from .utils import NOW, find_property_index, format_datetime, get_timestamp
from .utils import new_version as _new_version
from .utils import revoke as _revoke
__all__ = ['STIXJSONEncoder', '_STIXBase']
@ -85,8 +86,7 @@ class _STIXBase(collections.Mapping):
def _check_object_constraints(self):
for m in self.get("granular_markings", []):
# TODO: check selectors
pass
validate(self, m.get("selectors"))
def __init__(self, allow_custom=False, **kwargs):
cls = self.__class__
@ -173,30 +173,10 @@ class _STIXBase(collections.Mapping):
# Versioning API
def new_version(self, **kwargs):
unchangable_properties = []
if self.get("revoked"):
raise RevokeError("new_version")
new_obj_inner = copy.deepcopy(self._inner)
properties_to_change = kwargs.keys()
for prop in ["created", "created_by_ref", "id", "type"]:
if prop in properties_to_change:
unchangable_properties.append(prop)
if unchangable_properties:
raise UnmodifiablePropertyError(unchangable_properties)
cls = type(self)
if 'modified' not in kwargs:
kwargs['modified'] = get_timestamp()
else:
new_modified_property = parse_into_datetime(kwargs['modified'], precision='millisecond')
if new_modified_property < self.modified:
raise InvalidValueError(cls, 'modified', "The new modified datetime cannot be before the current modified datatime.")
new_obj_inner.update(kwargs)
return cls(**new_obj_inner)
return _new_version(self, **kwargs)
def revoke(self):
if self.get("revoked"):
raise RevokeError("revoke")
return self.new_version(revoked=True)
return _revoke(self)
class _Observable(_STIXBase):

View File

@ -1,9 +1,6 @@
"""STIX 2 Common Data Types and Properties"""
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from collections import OrderedDict
from .base import _STIXBase
from .properties import (HashesProperty, IDProperty, ListProperty, Property,

View File

@ -1,9 +1,6 @@
"""STIX 2.0 Objects that are neither SDOs nor SROs"""
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from collections import OrderedDict
from . import exceptions
from .base import _STIXBase

View File

@ -157,3 +157,29 @@ class ParseError(STIXError, ValueError):
def __init__(self, msg):
super(ParseError, self).__init__(msg)
class InvalidSelectorError(STIXError, AssertionError):
"""Granular Marking selector violation. The selector must resolve into an existing STIX object property."""
def __init__(self, cls, key):
super(InvalidSelectorError, self).__init__()
self.cls = cls
self.key = key
def __str__(self):
msg = "Selector {0} in {1} is not valid!"
return msg.format(self.key, self.cls.__class__.__name__)
class MarkingNotFoundError(STIXError, AssertionError):
"""Marking violation. The marking reference must be present in SDO or SRO."""
def __init__(self, cls, key):
super(MarkingNotFoundError, self).__init__()
self.cls = cls
self.key = key
def __str__(self):
msg = "Marking {0} was not found in {1}!"
return msg.format(self.key, self.cls.__class__.__name__)

214
stix2/markings/__init__.py Normal file
View File

@ -0,0 +1,214 @@
"""
Python STIX 2.0 Data Markings API.
These high level functions will operate on both object level markings and
granular markings unless otherwise noted in each of the functions.
"""
from stix2.markings import granular_markings, object_markings
def get_markings(obj, selectors=None, inherited=False, descendants=False):
"""
Get all markings associated to the field(s).
Args:
obj: An SDO or SRO object.
selectors: string or list of selectors strings relative to the SDO or
SRO in which the properties appear.
inherited: If True, include object level markings and granular markings
inherited relative to the properties.
descendants: If True, include granular markings applied to any children
relative to the properties.
Returns:
list: Marking identifiers that matched the selectors expression.
Note:
If ``selectors`` is None, operation will be performed only on object
level markings.
"""
if selectors is None:
return object_markings.get_markings(obj)
results = granular_markings.get_markings(
obj,
selectors,
inherited,
descendants
)
if inherited:
results.extend(object_markings.get_markings(obj))
return list(set(results))
def set_markings(obj, marking, selectors=None):
"""
Removes all markings associated with selectors and appends a new granular
marking. Refer to `clear_markings` and `add_markings` for details.
Args:
obj: An SDO or SRO object.
selectors: string or list of selectors strings relative to the SDO or
SRO in which the properties appear.
marking: identifier or list of marking identifiers that apply to the
properties selected by `selectors`.
Returns:
A new version of the given SDO or SRO with specified markings removed
and new ones added.
Note:
If ``selectors`` is None, operations will be performed on object level
markings. Otherwise on granular markings.
"""
if selectors is None:
return object_markings.set_markings(obj, marking)
else:
return granular_markings.set_markings(obj, marking, selectors)
def remove_markings(obj, marking, selectors=None):
"""
Removes granular_marking from the granular_markings collection.
Args:
obj: An SDO or SRO object.
selectors: string or list of selectors strings relative to the SDO or
SRO in which the properties appear.
marking: identifier or list of marking identifiers that apply to the
properties selected by `selectors`.
Raises:
InvalidSelectorError: If `selectors` fail validation.
MarkingNotFoundError: If markings to remove are not found on
the provided SDO or SRO.
Returns:
A new version of the given SDO or SRO with specified markings removed.
Note:
If ``selectors`` is None, operations will be performed on object level
markings. Otherwise on granular markings.
"""
if selectors is None:
return object_markings.remove_markings(obj, marking)
else:
return granular_markings.remove_markings(obj, marking, selectors)
def add_markings(obj, marking, selectors=None):
"""
Appends a granular_marking to the granular_markings collection.
Args:
obj: An SDO or SRO object.
selectors: string or list of selectors strings relative to the SDO or
SRO in which the properties appear.
marking: identifier or list of marking identifiers that apply to the
properties selected by `selectors`.
Raises:
InvalidSelectorError: If `selectors` fail validation.
Returns:
A new version of the given SDO or SRO with specified markings added.
Note:
If ``selectors`` is None, operations will be performed on object level
markings. Otherwise on granular markings.
"""
if selectors is None:
return object_markings.add_markings(obj, marking)
else:
return granular_markings.add_markings(obj, marking, selectors)
def clear_markings(obj, selectors=None):
"""
Removes all granular_marking associated with the selectors.
Args:
obj: An SDO or SRO object.
selectors: string or list of selectors strings relative to the SDO or
SRO in which the field(s) appear(s).
Raises:
InvalidSelectorError: If `selectors` fail validation.
MarkingNotFoundError: If markings to remove are not found on
the provided SDO or SRO.
Returns:
A new version of the given SDO or SRO with specified markings cleared.
Note:
If ``selectors`` is None, operations will be performed on object level
markings. Otherwise on granular markings.
"""
if selectors is None:
return object_markings.clear_markings(obj)
else:
return granular_markings.clear_markings(obj, selectors)
def is_marked(obj, marking=None, selectors=None, inherited=False, descendants=False):
"""
Checks if field(s) is marked by any marking or by specific marking(s).
Args:
obj: An SDO or SRO object.
selectors: string or list of selectors strings relative to the SDO or
SRO in which the field(s) appear(s).
marking: identifier or list of marking identifiers that apply to the
properties selected by `selectors`.
inherited: If True, include object level markings and granular markings
inherited to determine if the properties is/are marked.
descendants: If True, include granular markings applied to any children
of the given selector to determine if the properties is/are marked.
Returns:
bool: True if ``selectors`` is found on internal SDO or SRO collection.
False otherwise.
Note:
When a list of marking identifiers is provided, if ANY of the provided
marking identifiers match, True is returned.
If ``selectors`` is None, operation will be performed only on object
level markings.
"""
if selectors is None:
return object_markings.is_marked(obj, marking)
result = granular_markings.is_marked(
obj,
marking,
selectors,
inherited,
descendants
)
if inherited:
granular_marks = granular_markings.get_markings(obj, selectors)
object_marks = object_markings.get_markings(obj)
if granular_marks:
result = granular_markings.is_marked(
obj,
granular_marks,
selectors,
inherited,
descendants
)
result = result or object_markings.is_marked(obj, object_marks)
return result

View File

@ -0,0 +1,273 @@
from stix2 import exceptions
from stix2.markings import utils
from stix2.utils import new_version
def get_markings(obj, selectors, inherited=False, descendants=False):
"""
Get all markings associated to with the properties.
Args:
obj: An SDO or SRO object.
selectors: string or list of selector strings relative to the SDO or
SRO in which the properties appear.
inherited: If True, include markings inherited relative to the
properties.
descendants: If True, include granular markings applied to any children
relative to the properties.
Raises:
InvalidSelectorError: If `selectors` fail validation.
Returns:
list: Marking identifiers that matched the selectors expression.
"""
selectors = utils.convert_to_list(selectors)
utils.validate(obj, selectors)
granular_markings = obj.get("granular_markings", [])
if not granular_markings:
return []
results = set()
for marking in granular_markings:
for user_selector in selectors:
for marking_selector in marking.get("selectors", []):
if any([(user_selector == marking_selector), # Catch explicit selectors.
(user_selector.startswith(marking_selector) and inherited), # Catch inherited selectors.
(marking_selector.startswith(user_selector) and descendants)]): # Catch descendants selectors
refs = marking.get("marking_ref", [])
results.update([refs])
return list(results)
def set_markings(obj, marking, selectors):
"""
Removes all markings associated with selectors and appends a new granular
marking. Refer to `clear_markings` and `add_markings` for details.
Args:
obj: An SDO or SRO object.
selectors: string or list of selector strings relative to the SDO or
SRO in which the properties appear.
marking: identifier or list of marking identifiers that apply to the
properties selected by `selectors`.
Returns:
A new version of the given SDO or SRO with specified markings removed
and new ones added.
"""
obj = clear_markings(obj, selectors)
return add_markings(obj, marking, selectors)
def remove_markings(obj, marking, selectors):
"""
Removes granular_marking from the granular_markings collection.
Args:
obj: An SDO or SRO object.
selectors: string or list of selectors strings relative to the SDO or
SRO in which the properties appear.
marking: identifier or list of marking identifiers that apply to the
properties selected by `selectors`.
Raises:
InvalidSelectorError: If `selectors` fail validation.
MarkingNotFoundError: If markings to remove are not found on
the provided SDO or SRO.
Returns:
A new version of the given SDO or SRO with specified markings removed.
"""
selectors = utils.convert_to_list(selectors)
utils.validate(obj, selectors)
granular_markings = obj.get("granular_markings")
if not granular_markings:
return obj
granular_markings = utils.expand_markings(granular_markings)
if isinstance(marking, list):
to_remove = []
for m in marking:
to_remove.append({"marking_ref": m, "selectors": selectors})
else:
to_remove = [{"marking_ref": marking, "selectors": selectors}]
remove = utils.build_granular_marking(to_remove).get("granular_markings")
if not any(marking in granular_markings for marking in remove):
raise exceptions.MarkingNotFoundError(obj, remove)
granular_markings = [
m for m in granular_markings if m not in remove
]
granular_markings = utils.compress_markings(granular_markings)
if granular_markings:
return new_version(obj, granular_markings=granular_markings)
else:
return new_version(obj, granular_markings=None)
def add_markings(obj, marking, selectors):
"""
Appends a granular_marking to the granular_markings collection.
Args:
obj: An SDO or SRO object.
selectors: list of type string, selectors must be relative to the TLO
in which the properties appear.
marking: identifier or list of marking identifiers that apply to the
properties selected by `selectors`.
Raises:
InvalidSelectorError: If `selectors` fail validation.
Returns:
A new version of the given SDO or SRO with specified markings added.
"""
selectors = utils.convert_to_list(selectors)
utils.validate(obj, selectors)
if isinstance(marking, list):
granular_marking = []
for m in marking:
granular_marking.append({"marking_ref": m, "selectors": sorted(selectors)})
else:
granular_marking = [{"marking_ref": marking, "selectors": sorted(selectors)}]
if obj.get("granular_markings"):
granular_marking.extend(obj.get("granular_markings"))
granular_marking = utils.expand_markings(granular_marking)
granular_marking = utils.compress_markings(granular_marking)
return new_version(obj, granular_markings=granular_marking)
def clear_markings(obj, selectors):
"""
Removes all granular_markings associated with the selectors.
Args:
obj: An SDO or SRO object.
selectors: string or list of selectors strings relative to the SDO or
SRO in which the properties appear.
Raises:
InvalidSelectorError: If `selectors` fail validation.
MarkingNotFoundError: If markings to remove are not found on
the provided SDO or SRO.
Returns:
A new version of the given SDO or SRO with specified markings cleared.
"""
selectors = utils.convert_to_list(selectors)
utils.validate(obj, selectors)
granular_markings = obj.get("granular_markings")
if not granular_markings:
return obj
granular_markings = utils.expand_markings(granular_markings)
sdo = utils.build_granular_marking(
[{"selectors": selectors, "marking_ref": "N/A"}]
)
clear = sdo.get("granular_markings", [])
if not any(clear_selector in sdo_selectors.get("selectors", [])
for sdo_selectors in granular_markings
for clear_marking in clear
for clear_selector in clear_marking.get("selectors", [])
):
raise exceptions.MarkingNotFoundError(obj, clear)
for granular_marking in granular_markings:
for s in selectors:
if s in granular_marking.get("selectors", []):
marking_refs = granular_marking.get("marking_ref")
if marking_refs:
granular_marking["marking_ref"] = ""
granular_markings = utils.compress_markings(granular_markings)
if granular_markings:
return new_version(obj, granular_markings=granular_markings)
else:
return new_version(obj, granular_markings=None)
def is_marked(obj, marking=None, selectors=None, inherited=False, descendants=False):
"""
Checks if field is marked by any marking or by specific marking(s).
Args:
obj: An SDO or SRO object.
selectors: string or list of selectors strings relative to the SDO or
SRO in which the properties appear.
marking: identifier or list of marking identifiers that apply to the
properties selected by `selectors`.
inherited: If True, return markings inherited from the given selector.
descendants: If True, return granular markings applied to any children
of the given selector.
Raises:
InvalidSelectorError: If `selectors` fail validation.
Returns:
bool: True if ``selectors`` is found on internal SDO or SRO collection.
False otherwise.
Note:
When a list of marking identifiers is provided, if ANY of the provided
marking identifiers match, True is returned.
"""
if selectors is None:
raise TypeError("Required argument 'selectors' must be provided")
selectors = utils.convert_to_list(selectors)
marking = utils.convert_to_list(marking)
utils.validate(obj, selectors)
granular_markings = obj.get("granular_markings", [])
marked = False
markings = set()
for granular_marking in granular_markings:
for user_selector in selectors:
for marking_selector in granular_marking.get("selectors", []):
if any([(user_selector == marking_selector), # Catch explicit selectors.
(user_selector.startswith(marking_selector) and inherited), # Catch inherited selectors.
(marking_selector.startswith(user_selector) and descendants)]): # Catch descendants selectors
marking_ref = granular_marking.get("marking_ref", "")
if marking and any(x == marking_ref for x in marking):
markings.update([marking_ref])
marked = True
if marking:
# All user-provided markings must be found.
return markings.issuperset(set(marking))
return marked

View File

@ -0,0 +1,130 @@
from stix2 import exceptions
from stix2.markings import utils
from stix2.utils import new_version
def get_markings(obj):
"""
Get all object level markings from the given SDO or SRO object.
Args:
obj: A SDO or SRO object.
Returns:
list: Marking identifiers contained in the SDO or SRO. Empty list if no
markings are present in `object_marking_refs`.
"""
return obj.get("object_marking_refs", [])
def add_markings(obj, marking):
"""
Appends an object level marking to the object_marking_refs collection.
Args:
obj: A SDO or SRO object.
marking: identifier or list of identifiers to apply SDO or SRO object.
Returns:
A new version of the given SDO or SRO with specified markings added.
"""
marking = utils.convert_to_list(marking)
object_markings = set(obj.get("object_marking_refs", []) + marking)
return new_version(obj, object_marking_refs=list(object_markings))
def remove_markings(obj, marking):
"""
Removes object level marking from the object_marking_refs collection.
Args:
obj: A SDO or SRO object.
marking: identifier or list of identifiers that apply to the
SDO or SRO object.
Raises:
MarkingNotFoundError: If markings to remove are not found on
the provided SDO or SRO.
Returns:
A new version of the given SDO or SRO with specified markings removed.
"""
marking = utils.convert_to_list(marking)
object_markings = obj.get("object_marking_refs", [])
if not object_markings:
return obj
if any(x not in obj["object_marking_refs"] for x in marking):
raise exceptions.MarkingNotFoundError(obj, marking)
new_markings = [x for x in object_markings if x not in marking]
if new_markings:
return new_version(obj, object_marking_refs=new_markings)
else:
return new_version(obj, object_marking_refs=None)
def set_markings(obj, marking):
"""
Removes all object level markings and appends new object level markings to
the collection. Refer to `clear_markings` and `add_markings` for details.
Args:
obj: A SDO or SRO object.
marking: identifier or list of identifiers to apply in the
SDO or SRO object.
Returns:
A new version of the given SDO or SRO with specified markings removed
and new ones added.
"""
return add_markings(clear_markings(obj), marking)
def clear_markings(obj):
"""
Removes all object level markings from the object_marking_refs collection.
Args:
obj: A SDO or SRO object.
Returns:
A new version of the given SDO or SRO with object_marking_refs cleared.
"""
return new_version(obj, object_marking_refs=None)
def is_marked(obj, marking=None):
"""
Checks if SDO or SRO is marked by any marking or by specific marking(s).
Args:
obj: A SDO or SRO object.
marking: identifier or list of marking identifiers that apply to the
SDO or SRO object.
Returns:
bool: True if SDO or SRO has object level markings. False otherwise.
Note:
When an identifier or list of identifiers is provided, if ANY of the
provided marking refs match, True is returned.
"""
marking = utils.convert_to_list(marking)
object_markings = obj.get("object_marking_refs", [])
if marking:
return any(x in object_markings for x in marking)
else:
return bool(object_markings)

229
stix2/markings/utils.py Normal file
View File

@ -0,0 +1,229 @@
import collections
import six
from stix2 import exceptions
def _evaluate_expression(obj, selector):
"""
Walks an SDO or SRO generating selectors to match against ``selector``. If
a match is found and the the value of this property is present in the
objects. Matching value of the property will be returned.
Args:
obj: An SDO or SRO object.
selector: A string following the selector syntax.
Returns:
list: Values contained in matching property. Otherwise empty list.
"""
for items, value in iterpath(obj):
path = ".".join(items)
if path == selector and value:
return [value]
return []
def _validate_selector(obj, selector):
"""Internal method to evaluate each selector."""
results = list(_evaluate_expression(obj, selector))
if len(results) >= 1:
return True
def validate(obj, selectors):
"""Given an SDO or SRO, check that each selector is valid."""
if selectors:
for s in selectors:
if not _validate_selector(obj, s):
raise exceptions.InvalidSelectorError(obj, s)
return
raise exceptions.InvalidSelectorError(obj, selectors)
def convert_to_list(data):
"""Convert input into a list for further processing."""
if data is not None:
if isinstance(data, list):
return data
else:
return [data]
def compress_markings(granular_markings):
"""
Compress granular markings list. If there is more than one marking
identifier matches. It will collapse into a single granular marking.
Examples:
Input:
[
{
"selectors": [
"description"
],
"marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
},
{
"selectors": [
"name"
],
"marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
}
]
Output:
[
{
"selectors": [
"description",
"name"
],
"marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
}
]
Args:
granular_markings: The granular markings list property present in a
SDO or SRO.
Returns:
list: A list with all markings collapsed.
"""
if not granular_markings:
return
map_ = collections.defaultdict(set)
for granular_marking in granular_markings:
if granular_marking.get("marking_ref"):
map_[granular_marking.get("marking_ref")].update(granular_marking.get("selectors"))
compressed = \
[
{"marking_ref": marking_ref, "selectors": sorted(selectors)}
for marking_ref, selectors in six.iteritems(map_)
]
return compressed
def expand_markings(granular_markings):
"""
Expands granular markings list. If there is more than one selector per
granular marking. It will be expanded using the same marking_ref.
Examples:
Input:
[
{
"selectors": [
"description",
"name"
],
"marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
}
]
Output:
[
{
"selectors": [
"description"
],
"marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
},
{
"selectors": [
"name"
],
"marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
}
]
Args:
granular_markings: The granular markings list property present in a
SDO or SRO.
Returns:
list: A list with all markings expanded.
"""
expanded = []
for marking in granular_markings:
selectors = marking.get("selectors")
marking_ref = marking.get("marking_ref")
expanded.extend(
[
{"marking_ref": marking_ref, "selectors": [selector]}
for selector in selectors
]
)
return expanded
def build_granular_marking(granular_marking):
"""Returns a dictionary with the required structure for a granular
marking"""
return {"granular_markings": expand_markings(granular_marking)}
def iterpath(obj, path=None):
"""
Generator which walks the input ``obj`` model. Each iteration yields a
tuple containing a list of ancestors and the property value.
Args:
obj: An SDO or SRO object.
path: None, used recursively to store ancestors.
Example:
>>> for item in iterpath(obj):
>>> print(item)
(['type'], 'campaign')
...
(['cybox', 'objects', '[0]', 'hashes', 'sha1'], 'cac35ec206d868b7d7cb0b55f31d9425b075082b')
Returns:
tuple: Containing two items: a list of ancestors and the
property value.
"""
if path is None:
path = []
for varname, varobj in iter(sorted(six.iteritems(obj))):
path.append(varname)
yield (path, varobj)
if isinstance(varobj, dict):
for item in iterpath(varobj, path):
yield item
elif isinstance(varobj, list):
for item in varobj:
index = "[{0}]".format(varobj.index(item))
path.append(index)
yield (path, item)
if isinstance(item, dict):
for descendant in iterpath(item, path):
yield descendant
path.pop()
path.pop()

View File

@ -5,10 +5,7 @@ embedded in Email Message objects, inherit from _STIXBase instead of Observable
and do not have a '_type' attribute.
"""
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from collections import OrderedDict
from .base import _Extension, _Observable, _STIXBase
from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError,

View File

@ -312,6 +312,7 @@ class ReferenceProperty(Property):
def clean(self, value):
if isinstance(value, _STIXBase):
value = value.id
value = str(value)
if self.type:
if not value.startswith(self.type):
raise ValueError("must start with '{0}'.".format(self.type))

View File

@ -1,9 +1,6 @@
"""STIX 2.0 Domain Objects"""
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from collections import OrderedDict
import stix2

View File

@ -1,9 +1,6 @@
"""STIX 2.0 Relationship Objects."""
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from collections import OrderedDict
from .base import _STIXBase
from .common import ExternalReference, GranularMarking

View File

@ -20,6 +20,26 @@ TOOL_ID = "tool--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f"
SIGHTING_ID = "sighting--bfbc19db-ec35-4e45-beed-f8bde2a772fb"
VULNERABILITY_ID = "vulnerability--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061"
MARKING_IDS = [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
"marking-definition--443eb5c3-a76c-4a0a-8caa-e93998e7bc09",
"marking-definition--57fcd772-9c1d-41b0-8d1f-3d47713415d9",
"marking-definition--462bf1a6-03d2-419c-b74e-eee2238b2de4",
"marking-definition--68520ae2-fefe-43a9-84ee-2c2a934d2c7d",
"marking-definition--2802dfb1-1019-40a8-8848-68d0ec0e417f",
]
# All required args for a Campaign instance, plus some optional args
CAMPAIGN_MORE_KWARGS = dict(
type='campaign',
id=CAMPAIGN_ID,
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector.",
)
# Minimum required args for an Identity instance
IDENTITY_KWARGS = dict(
name="John Smith",
@ -38,6 +58,17 @@ MALWARE_KWARGS = dict(
name="Cryptolocker",
)
# All required args for a Malware instance, plus some optional args
MALWARE_MORE_KWARGS = dict(
type='malware',
id=MALWARE_ID,
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
labels=['ransomware'],
name="Cryptolocker",
description="A ransomware related to ..."
)
# Minimum required args for a Relationship instance
RELATIONSHIP_KWARGS = dict(
relationship_type="indicates",

File diff suppressed because it is too large Load Diff

View File

@ -103,7 +103,7 @@ def test_marking_def_invalid_type():
stix2.MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
definition_type="my-definiition-type",
definition_type="my-definition-type",
definition=stix2.StatementMarking("Copyright 2016, Example Corp")
)

View File

@ -0,0 +1,534 @@
import pytest
from stix2 import Malware, exceptions, markings
from .constants import FAKE_TIME, MALWARE_ID, MARKING_IDS
from .constants import MALWARE_KWARGS as MALWARE_KWARGS_CONST
"""Tests for the Data Markings API."""
MALWARE_KWARGS = MALWARE_KWARGS_CONST.copy()
MALWARE_KWARGS.update({
'id': MALWARE_ID,
'created': FAKE_TIME,
'modified': FAKE_TIME,
})
@pytest.mark.parametrize("data", [
(
Malware(**MALWARE_KWARGS),
Malware(object_marking_refs=[MARKING_IDS[0]],
**MALWARE_KWARGS),
),
(
MALWARE_KWARGS,
dict(object_marking_refs=[MARKING_IDS[0]],
**MALWARE_KWARGS),
),
])
def test_add_markings_one_marking(data):
before = data[0]
after = data[1]
before = markings.add_markings(before, MARKING_IDS[0], None)
for m in before["object_marking_refs"]:
assert m in after["object_marking_refs"]
def test_add_markings_multiple_marking():
before = Malware(
**MALWARE_KWARGS
)
after = Malware(
object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1]],
**MALWARE_KWARGS
)
before = markings.add_markings(before, [MARKING_IDS[0], MARKING_IDS[1]], None)
for m in before["object_marking_refs"]:
assert m in after["object_marking_refs"]
def test_add_markings_combination():
before = Malware(
**MALWARE_KWARGS
)
after = Malware(
object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1]],
granular_markings=[
{
"selectors": ["labels"],
"marking_ref": MARKING_IDS[2]
},
{
"selectors": ["name"],
"marking_ref": MARKING_IDS[3]
}
],
**MALWARE_KWARGS
)
before = markings.add_markings(before, MARKING_IDS[0], None)
before = markings.add_markings(before, MARKING_IDS[1], None)
before = markings.add_markings(before, MARKING_IDS[2], "labels")
before = markings.add_markings(before, MARKING_IDS[3], "name")
for m in before["granular_markings"]:
assert m in after["granular_markings"]
for m in before["object_marking_refs"]:
assert m in after["object_marking_refs"]
@pytest.mark.parametrize("data", [
([""]),
(""),
([]),
([MARKING_IDS[0], 456])
])
def test_add_markings_bad_markings(data):
before = Malware(
**MALWARE_KWARGS
)
with pytest.raises(exceptions.InvalidValueError):
before = markings.add_markings(before, data, None)
assert "object_marking_refs" not in before
GET_MARKINGS_TEST_DATA = \
{
"a": 333,
"b": "value",
"c": [
17,
"list value",
{
"g": "nested",
"h": 45
}
],
"x": {
"y": [
"hello",
88
],
"z": {
"foo1": "bar",
"foo2": 65
}
},
"object_marking_refs": ["11"],
"granular_markings": [
{
"marking_ref": "1",
"selectors": ["a"]
},
{
"marking_ref": "2",
"selectors": ["c"]
},
{
"marking_ref": "3",
"selectors": ["c.[1]"]
},
{
"marking_ref": "4",
"selectors": ["c.[2]"]
},
{
"marking_ref": "5",
"selectors": ["c.[2].g"]
},
{
"marking_ref": "6",
"selectors": ["x"]
},
{
"marking_ref": "7",
"selectors": ["x.y"]
},
{
"marking_ref": "8",
"selectors": ["x.y.[1]"]
},
{
"marking_ref": "9",
"selectors": ["x.z"]
},
{
"marking_ref": "10",
"selectors": ["x.z.foo2"]
},
]
}
@pytest.mark.parametrize("data", [GET_MARKINGS_TEST_DATA])
def test_get_markings_object_marking(data):
assert set(markings.get_markings(data, None)) == set(["11"])
@pytest.mark.parametrize("data", [GET_MARKINGS_TEST_DATA])
def test_get_markings_object_and_granular_combinations(data):
"""Test multiple combinations for inherited and descendant markings."""
assert set(markings.get_markings(data, "a", False, False)) == set(["1"])
assert set(markings.get_markings(data, "a", True, False)) == set(["1", "11"])
assert set(markings.get_markings(data, "a", True, True)) == set(["1", "11"])
assert set(markings.get_markings(data, "a", False, True)) == set(["1"])
assert set(markings.get_markings(data, "b", False, False)) == set([])
assert set(markings.get_markings(data, "b", True, False)) == set(["11"])
assert set(markings.get_markings(data, "b", True, True)) == set(["11"])
assert set(markings.get_markings(data, "b", False, True)) == set([])
assert set(markings.get_markings(data, "c", False, False)) == set(["2"])
assert set(markings.get_markings(data, "c", True, False)) == set(["2", "11"])
assert set(markings.get_markings(data, "c", True, True)) == set(["2", "3", "4", "5", "11"])
assert set(markings.get_markings(data, "c", False, True)) == set(["2", "3", "4", "5"])
assert set(markings.get_markings(data, "c.[0]", False, False)) == set([])
assert set(markings.get_markings(data, "c.[0]", True, False)) == set(["2", "11"])
assert set(markings.get_markings(data, "c.[0]", True, True)) == set(["2", "11"])
assert set(markings.get_markings(data, "c.[0]", False, True)) == set([])
assert set(markings.get_markings(data, "c.[1]", False, False)) == set(["3"])
assert set(markings.get_markings(data, "c.[1]", True, False)) == set(["2", "3", "11"])
assert set(markings.get_markings(data, "c.[1]", True, True)) == set(["2", "3", "11"])
assert set(markings.get_markings(data, "c.[1]", False, True)) == set(["3"])
assert set(markings.get_markings(data, "c.[2]", False, False)) == set(["4"])
assert set(markings.get_markings(data, "c.[2]", True, False)) == set(["2", "4", "11"])
assert set(markings.get_markings(data, "c.[2]", True, True)) == set(["2", "4", "5", "11"])
assert set(markings.get_markings(data, "c.[2]", False, True)) == set(["4", "5"])
assert set(markings.get_markings(data, "c.[2].g", False, False)) == set(["5"])
assert set(markings.get_markings(data, "c.[2].g", True, False)) == set(["2", "4", "5", "11"])
assert set(markings.get_markings(data, "c.[2].g", True, True)) == set(["2", "4", "5", "11"])
assert set(markings.get_markings(data, "c.[2].g", False, True)) == set(["5"])
assert set(markings.get_markings(data, "x", False, False)) == set(["6"])
assert set(markings.get_markings(data, "x", True, False)) == set(["6", "11"])
assert set(markings.get_markings(data, "x", True, True)) == set(["6", "7", "8", "9", "10", "11"])
assert set(markings.get_markings(data, "x", False, True)) == set(["6", "7", "8", "9", "10"])
assert set(markings.get_markings(data, "x.y", False, False)) == set(["7"])
assert set(markings.get_markings(data, "x.y", True, False)) == set(["6", "7", "11"])
assert set(markings.get_markings(data, "x.y", True, True)) == set(["6", "7", "8", "11"])
assert set(markings.get_markings(data, "x.y", False, True)) == set(["7", "8"])
assert set(markings.get_markings(data, "x.y.[0]", False, False)) == set([])
assert set(markings.get_markings(data, "x.y.[0]", True, False)) == set(["6", "7", "11"])
assert set(markings.get_markings(data, "x.y.[0]", True, True)) == set(["6", "7", "11"])
assert set(markings.get_markings(data, "x.y.[0]", False, True)) == set([])
assert set(markings.get_markings(data, "x.y.[1]", False, False)) == set(["8"])
assert set(markings.get_markings(data, "x.y.[1]", True, False)) == set(["6", "7", "8", "11"])
assert set(markings.get_markings(data, "x.y.[1]", True, True)) == set(["6", "7", "8", "11"])
assert set(markings.get_markings(data, "x.y.[1]", False, True)) == set(["8"])
assert set(markings.get_markings(data, "x.z", False, False)) == set(["9"])
assert set(markings.get_markings(data, "x.z", True, False)) == set(["6", "9", "11"])
assert set(markings.get_markings(data, "x.z", True, True)) == set(["6", "9", "10", "11"])
assert set(markings.get_markings(data, "x.z", False, True)) == set(["9", "10"])
assert set(markings.get_markings(data, "x.z.foo1", False, False)) == set([])
assert set(markings.get_markings(data, "x.z.foo1", True, False)) == set(["6", "9", "11"])
assert set(markings.get_markings(data, "x.z.foo1", True, True)) == set(["6", "9", "11"])
assert set(markings.get_markings(data, "x.z.foo1", False, True)) == set([])
assert set(markings.get_markings(data, "x.z.foo2", False, False)) == set(["10"])
assert set(markings.get_markings(data, "x.z.foo2", True, False)) == set(["6", "9", "10", "11"])
assert set(markings.get_markings(data, "x.z.foo2", True, True)) == set(["6", "9", "10", "11"])
assert set(markings.get_markings(data, "x.z.foo2", False, True)) == set(["10"])
@pytest.mark.parametrize("data", [
(
Malware(object_marking_refs=[MARKING_IDS[0]],
**MALWARE_KWARGS),
Malware(**MALWARE_KWARGS),
),
(
dict(object_marking_refs=[MARKING_IDS[0]],
**MALWARE_KWARGS),
MALWARE_KWARGS,
),
])
def test_remove_markings_object_level(data):
before = data[0]
after = data[1]
before = markings.remove_markings(before, MARKING_IDS[0], None)
assert 'object_marking_refs' not in before
assert 'object_marking_refs' not in after
modified = after['modified']
after = markings.remove_markings(after, MARKING_IDS[0], None)
modified == after['modified']
@pytest.mark.parametrize("data", [
(
Malware(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]],
**MALWARE_KWARGS),
Malware(object_marking_refs=[MARKING_IDS[1]],
**MALWARE_KWARGS),
),
(
dict(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]],
**MALWARE_KWARGS),
dict(object_marking_refs=[MARKING_IDS[1]],
**MALWARE_KWARGS),
),
])
def test_remove_markings_multiple(data):
before = data[0]
after = data[1]
before = markings.remove_markings(before, [MARKING_IDS[0], MARKING_IDS[2]], None)
assert before['object_marking_refs'] == after['object_marking_refs']
def test_remove_markings_bad_markings():
before = Malware(
object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]],
**MALWARE_KWARGS
)
with pytest.raises(AssertionError) as excinfo:
markings.remove_markings(before, [MARKING_IDS[4]], None)
assert str(excinfo.value) == "Marking ['%s'] was not found in Malware!" % MARKING_IDS[4]
@pytest.mark.parametrize("data", [
(
Malware(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]],
**MALWARE_KWARGS),
Malware(**MALWARE_KWARGS),
),
(
dict(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]],
**MALWARE_KWARGS),
MALWARE_KWARGS,
),
])
def test_clear_markings(data):
before = data[0]
after = data[1]
before = markings.clear_markings(before, None)
assert 'object_marking_refs' not in before
assert 'object_marking_refs' not in after
def test_is_marked_object_and_granular_combinations():
"""Test multiple combinations for inherited and descendant markings."""
test_sdo = \
{
"a": 333,
"b": "value",
"c": [
17,
"list value",
{
"g": "nested",
"h": 45
}
],
"x": {
"y": [
"hello",
88
],
"z": {
"foo1": "bar",
"foo2": 65
}
},
"object_marking_refs": "11",
"granular_markings": [
{
"marking_ref": "1",
"selectors": ["a"]
},
{
"marking_ref": "2",
"selectors": ["c"]
},
{
"marking_ref": "3",
"selectors": ["c.[1]"]
},
{
"marking_ref": "4",
"selectors": ["c.[2]"]
},
{
"marking_ref": "5",
"selectors": ["c.[2].g"]
},
{
"marking_ref": "6",
"selectors": ["x"]
},
{
"marking_ref": "7",
"selectors": ["x.y"]
},
{
"marking_ref": "8",
"selectors": ["x.y.[1]"]
},
{
"marking_ref": "9",
"selectors": ["x.z"]
},
{
"marking_ref": "10",
"selectors": ["x.z.foo2"]
},
]
}
assert markings.is_marked(test_sdo, ["1"], "a", False, False)
assert markings.is_marked(test_sdo, ["1", "11"], "a", True, False)
assert markings.is_marked(test_sdo, ["1", "11"], "a", True, True)
assert markings.is_marked(test_sdo, ["1"], "a", False, True)
assert markings.is_marked(test_sdo, "b", inherited=False, descendants=False) is False
assert markings.is_marked(test_sdo, ["11"], "b", True, False)
assert markings.is_marked(test_sdo, ["11"], "b", True, True)
assert markings.is_marked(test_sdo, "b", inherited=False, descendants=True) is False
assert markings.is_marked(test_sdo, ["2"], "c", False, False)
assert markings.is_marked(test_sdo, ["2", "11"], "c", True, False)
assert markings.is_marked(test_sdo, ["2", "3", "4", "5", "11"], "c", True, True)
assert markings.is_marked(test_sdo, ["2", "3", "4", "5"], "c", False, True)
assert markings.is_marked(test_sdo, "c.[0]", inherited=False, descendants=False) is False
assert markings.is_marked(test_sdo, ["2", "11"], "c.[0]", True, False)
assert markings.is_marked(test_sdo, ["2", "11"], "c.[0]", True, True)
assert markings.is_marked(test_sdo, "c.[0]", inherited=False, descendants=True) is False
assert markings.is_marked(test_sdo, ["3"], "c.[1]", False, False)
assert markings.is_marked(test_sdo, ["2", "3", "11"], "c.[1]", True, False)
assert markings.is_marked(test_sdo, ["2", "3", "11"], "c.[1]", True, True)
assert markings.is_marked(test_sdo, ["3"], "c.[1]", False, True)
assert markings.is_marked(test_sdo, ["4"], "c.[2]", False, False)
assert markings.is_marked(test_sdo, ["2", "4", "11"], "c.[2]", True, False)
assert markings.is_marked(test_sdo, ["2", "4", "5", "11"], "c.[2]", True, True)
assert markings.is_marked(test_sdo, ["4", "5"], "c.[2]", False, True)
assert markings.is_marked(test_sdo, ["5"], "c.[2].g", False, False)
assert markings.is_marked(test_sdo, ["2", "4", "5", "11"], "c.[2].g", True, False)
assert markings.is_marked(test_sdo, ["2", "4", "5", "11"], "c.[2].g", True, True)
assert markings.is_marked(test_sdo, ["5"], "c.[2].g", False, True)
assert markings.is_marked(test_sdo, ["6"], "x", False, False)
assert markings.is_marked(test_sdo, ["6", "11"], "x", True, False)
assert markings.is_marked(test_sdo, ["6", "7", "8", "9", "10", "11"], "x", True, True)
assert markings.is_marked(test_sdo, ["6", "7", "8", "9", "10"], "x", False, True)
assert markings.is_marked(test_sdo, ["7"], "x.y", False, False)
assert markings.is_marked(test_sdo, ["6", "7", "11"], "x.y", True, False)
assert markings.is_marked(test_sdo, ["6", "7", "8", "11"], "x.y", True, True)
assert markings.is_marked(test_sdo, ["7", "8"], "x.y", False, True)
assert markings.is_marked(test_sdo, "x.y.[0]", inherited=False, descendants=False) is False
assert markings.is_marked(test_sdo, ["6", "7", "11"], "x.y.[0]", True, False)
assert markings.is_marked(test_sdo, ["6", "7", "11"], "x.y.[0]", True, True)
assert markings.is_marked(test_sdo, "x.y.[0]", inherited=False, descendants=True) is False
assert markings.is_marked(test_sdo, ["8"], "x.y.[1]", False, False)
assert markings.is_marked(test_sdo, ["6", "7", "8", "11"], "x.y.[1]", True, False)
assert markings.is_marked(test_sdo, ["6", "7", "8", "11"], "x.y.[1]", True, True)
assert markings.is_marked(test_sdo, ["8"], "x.y.[1]", False, True)
assert markings.is_marked(test_sdo, ["9"], "x.z", False, False)
assert markings.is_marked(test_sdo, ["6", "9", "11"], "x.z", True, False)
assert markings.is_marked(test_sdo, ["6", "9", "10", "11"], "x.z", True, True)
assert markings.is_marked(test_sdo, ["9", "10"], "x.z", False, True)
assert markings.is_marked(test_sdo, "x.z.foo1", inherited=False, descendants=False) is False
assert markings.is_marked(test_sdo, ["6", "9", "11"], "x.z.foo1", True, False)
assert markings.is_marked(test_sdo, ["6", "9", "11"], "x.z.foo1", True, True)
assert markings.is_marked(test_sdo, "x.z.foo1", inherited=False, descendants=True) is False
assert markings.is_marked(test_sdo, ["10"], "x.z.foo2", False, False)
assert markings.is_marked(test_sdo, ["6", "9", "10", "11"], "x.z.foo2", True, False)
assert markings.is_marked(test_sdo, ["6", "9", "10", "11"], "x.z.foo2", True, True)
assert markings.is_marked(test_sdo, ["10"], "x.z.foo2", False, True)
assert markings.is_marked(test_sdo, ["11"], None, True, True)
assert markings.is_marked(test_sdo, ["2"], None, True, True) is False
@pytest.mark.parametrize("data", [
(
Malware(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]],
**MALWARE_KWARGS),
Malware(**MALWARE_KWARGS),
),
(
dict(object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]],
**MALWARE_KWARGS),
MALWARE_KWARGS,
),
])
def test_is_marked_no_markings(data):
marked = data[0]
nonmarked = data[1]
assert markings.is_marked(marked)
assert markings.is_marked(nonmarked) is False
def test_set_marking():
before = Malware(
object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]],
**MALWARE_KWARGS
)
after = Malware(
object_marking_refs=[MARKING_IDS[4], MARKING_IDS[5]],
**MALWARE_KWARGS
)
before = markings.set_markings(before, [MARKING_IDS[4], MARKING_IDS[5]], None)
for m in before["object_marking_refs"]:
assert m in [MARKING_IDS[4], MARKING_IDS[5]]
assert [MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]] not in before["object_marking_refs"]
for x in before["object_marking_refs"]:
assert x in after["object_marking_refs"]
@pytest.mark.parametrize("data", [
([]),
([""]),
(""),
([MARKING_IDS[4], 687])
])
def test_set_marking_bad_input(data):
before = Malware(
object_marking_refs=[MARKING_IDS[0]],
**MALWARE_KWARGS
)
after = Malware(
object_marking_refs=[MARKING_IDS[0]],
**MALWARE_KWARGS
)
with pytest.raises(exceptions.InvalidValueError):
before = markings.set_markings(before, data, None)
assert before == after

View File

@ -2,16 +2,11 @@ import pytest
import stix2
from .constants import CAMPAIGN_MORE_KWARGS
def test_making_new_version():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS)
campaign_v2 = campaign_v1.new_version(name="fred")
@ -25,14 +20,7 @@ def test_making_new_version():
def test_making_new_version_with_unset():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS)
campaign_v2 = campaign_v1.new_version(description=None)
@ -47,16 +35,11 @@ def test_making_new_version_with_unset():
def test_making_new_version_with_embedded_object():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
external_references=[{
"source_name": "capec",
"external_id": "CAPEC-163"
}],
description="Campaign by Green Group against a series of targets in the financial services sector."
**CAMPAIGN_MORE_KWARGS
)
campaign_v2 = campaign_v1.new_version(external_references=[{
@ -74,14 +57,7 @@ def test_making_new_version_with_embedded_object():
def test_revoke():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS)
campaign_v2 = campaign_v1.revoke()
@ -96,14 +72,7 @@ def test_revoke():
def test_versioning_error_invalid_property():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS)
with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as excinfo:
campaign_v1.new_version(type="threat-actor")
@ -112,14 +81,7 @@ def test_versioning_error_invalid_property():
def test_versioning_error_bad_modified_value():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS)
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
campaign_v1.new_version(modified="2015-04-06T20:03:00.000Z")
@ -135,14 +97,7 @@ def test_versioning_error_bad_modified_value():
def test_versioning_error_usetting_required_property():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS)
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
campaign_v1.new_version(name=None)
@ -156,15 +111,7 @@ def test_versioning_error_usetting_required_property():
def test_versioning_error_new_version_of_revoked():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS)
campaign_v2 = campaign_v1.revoke()
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
@ -176,15 +123,7 @@ def test_versioning_error_new_version_of_revoked():
def test_versioning_error_revoke_of_revoked():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS)
campaign_v2 = campaign_v1.revoke()
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
@ -193,3 +132,77 @@ def test_versioning_error_revoke_of_revoked():
assert excinfo.value.called_by == "revoke"
assert str(excinfo.value) == "Cannot revoke an already revoked object."
def test_making_new_version_dict():
campaign_v1 = CAMPAIGN_MORE_KWARGS
campaign_v2 = stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, name="fred")
assert campaign_v1['id'] == campaign_v2['id']
assert campaign_v1['created_by_ref'] == campaign_v2['created_by_ref']
assert campaign_v1['created'] == campaign_v2['created']
assert campaign_v1['name'] != campaign_v2['name']
assert campaign_v2['name'] == "fred"
assert campaign_v1['description'] == campaign_v2['description']
assert stix2.utils.parse_into_datetime(campaign_v1['modified'], precision='millisecond') < campaign_v2['modified']
def test_versioning_error_dict_bad_modified_value():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.utils.new_version(CAMPAIGN_MORE_KWARGS, modified="2015-04-06T20:03:00.000Z")
assert excinfo.value.cls == dict
assert excinfo.value.prop_name == "modified"
assert excinfo.value.reason == "The new modified datetime cannot be before the current modified datatime."
def test_versioning_error_dict_no_modified_value():
campaign_v1 = {
'type': 'campaign',
'id': "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
'created': "2016-04-06T20:03:00.000Z",
'name': "Green Group Attacks Against Finance",
}
campaign_v2 = stix2.utils.new_version(campaign_v1, modified="2017-04-06T20:03:00.000Z")
assert str(campaign_v2['modified']) == "2017-04-06T20:03:00.000Z"
def test_making_new_version_invalid_cls():
campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo:
stix2.utils.new_version(campaign_v1, name="fred")
assert 'cannot create new version of object of this type' in str(excinfo.value)
def test_revoke_dict():
campaign_v1 = CAMPAIGN_MORE_KWARGS
campaign_v2 = stix2.utils.revoke(campaign_v1)
assert campaign_v1['id'] == campaign_v2['id']
assert campaign_v1['created_by_ref'] == campaign_v2['created_by_ref']
assert campaign_v1['created'] == campaign_v2['created']
assert campaign_v1['name'] == campaign_v2['name']
assert campaign_v1['description'] == campaign_v2['description']
assert stix2.utils.parse_into_datetime(campaign_v1['modified'], precision='millisecond') < campaign_v2['modified']
assert campaign_v2['revoked']
def test_versioning_error_revoke_of_revoked_dict():
campaign_v1 = CAMPAIGN_MORE_KWARGS
campaign_v2 = stix2.utils.revoke(campaign_v1)
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
stix2.utils.revoke(campaign_v2)
assert excinfo.value.called_by == "revoke"
def test_revoke_invalid_cls():
campaign_v1 = "This is a campaign."
with pytest.raises(ValueError) as excinfo:
stix2.utils.revoke(campaign_v1)
assert 'cannot revoke object of this type' in str(excinfo.value)

View File

@ -1,11 +1,16 @@
"""Utility functions and classes for the stix2 library."""
from collections import Mapping
import copy
import datetime as dt
import json
from dateutil import parser
import pytz
from .exceptions import (InvalidValueError, RevokeError,
UnmodifiablePropertyError)
# Sentinel value for properties that should be set to the current time.
# We can't use the standard 'default' approach, since if there are multiple
# timestamps in a single object, the timestamps will vary by a few microseconds.
@ -150,3 +155,51 @@ def find_property_index(obj, properties, tuple_to_find):
tuple_to_find)
if val is not None:
return val
def new_version(data, **kwargs):
"""Create a new version of a STIX object, by modifying properties and
updating the `modified` property.
"""
if not isinstance(data, Mapping):
raise ValueError('cannot create new version of object of this type! '
'Try a dictionary or instance of an SDO or SRO class.')
unchangable_properties = []
if data.get("revoked"):
raise RevokeError("new_version")
try:
new_obj_inner = copy.deepcopy(data._inner)
except AttributeError:
new_obj_inner = copy.deepcopy(data)
properties_to_change = kwargs.keys()
# Make sure certain properties aren't trying to change
for prop in ["created", "created_by_ref", "id", "type"]:
if prop in properties_to_change:
unchangable_properties.append(prop)
if unchangable_properties:
raise UnmodifiablePropertyError(unchangable_properties)
cls = type(data)
if 'modified' not in kwargs:
kwargs['modified'] = get_timestamp()
elif 'modified' in data:
old_modified_property = parse_into_datetime(data.get('modified'), precision='millisecond')
new_modified_property = parse_into_datetime(kwargs['modified'], precision='millisecond')
if new_modified_property < old_modified_property:
raise InvalidValueError(cls, 'modified', "The new modified datetime cannot be before the current modified datatime.")
new_obj_inner.update(kwargs)
# Exclude properties with a value of 'None' in case data is not an instance of a _STIXBase subclass
return cls(**{k: v for k, v in new_obj_inner.items() if v is not None})
def revoke(data):
if not isinstance(data, Mapping):
raise ValueError('cannot revoke object of this type! Try a dictionary '
'or instance of an SDO or SRO class.')
if data.get("revoked"):
raise RevokeError("revoke")
return new_version(data, revoked=True)

View File

@ -1,5 +1,5 @@
[tox]
envlist = py26,py27,py33,py34,py35,py36,pycodestyle,isort-check
envlist = py27,py33,py34,py35,py36,pycodestyle,isort-check
[testenv]
deps =
@ -36,7 +36,6 @@ commands =
[travis]
python =
2.6: py26
2.7: py27, pycodestyle
3.3: py33, pycodestyle
3.4: py34, pycodestyle