Merge pull request #4 from rpiazza/versioning

Added versioning api, with tests
stix2.1
Greg Back 2017-05-05 10:15:06 -05:00 committed by GitHub
commit 0117eee042
10 changed files with 285 additions and 17 deletions

View File

@ -1,12 +1,15 @@
"""Base class for type definitions in the stix2 library.""" """Base class for type definitions in the stix2 library."""
import collections import collections
import copy
import datetime as dt import datetime as dt
import json import json
from .exceptions import ExtraFieldsError, ImmutableError, InvalidValueError, \ from .exceptions import ExtraFieldsError, ImmutableError, InvalidValueError, \
MissingFieldsError MissingFieldsError, RevokeError, UnmodifiablePropertyError
from .utils import format_datetime, get_timestamp, NOW from .utils import format_datetime, get_timestamp, NOW, parse_into_datetime
__all__ = ['STIXJSONEncoder', '_STIXBase'] __all__ = ['STIXJSONEncoder', '_STIXBase']
@ -56,16 +59,22 @@ class _STIXBase(collections.Mapping):
if extra_kwargs: if extra_kwargs:
raise ExtraFieldsError(cls, extra_kwargs) raise ExtraFieldsError(cls, extra_kwargs)
# Remove any keyword arguments whose value is None
setting_kwargs = {}
for prop_name, prop_value in kwargs.items():
if prop_value:
setting_kwargs[prop_name] = prop_value
# Detect any missing required fields # Detect any missing required fields
required_fields = get_required_properties(cls._properties) required_fields = get_required_properties(cls._properties)
missing_kwargs = set(required_fields) - set(kwargs) missing_kwargs = set(required_fields) - set(setting_kwargs)
if missing_kwargs: if missing_kwargs:
raise MissingFieldsError(cls, missing_kwargs) raise MissingFieldsError(cls, missing_kwargs)
for prop_name, prop_metadata in cls._properties.items(): for prop_name, prop_metadata in cls._properties.items():
self._check_property(prop_name, prop_metadata, kwargs) self._check_property(prop_name, prop_metadata, setting_kwargs)
self._inner = kwargs self._inner = setting_kwargs
if self.granular_markings: if self.granular_markings:
for m in self.granular_markings: for m in self.granular_markings:
@ -99,3 +108,37 @@ class _STIXBase(collections.Mapping):
props = [(k, self[k]) for k in sorted(self._properties) if self.get(k)] props = [(k, self[k]) for k in sorted(self._properties) if self.get(k)]
return "{0}({1})".format(self.__class__.__name__, return "{0}({1})".format(self.__class__.__name__,
", ".join(["{0!s}={1!r}".format(k, v) for k, v in props])) ", ".join(["{0!s}={1!r}".format(k, v) for k, v in props]))
def __deepcopy__(self, memo):
# Assumption: we can ignore the memo argument, because no object will ever contain the same sub-object multiple times.
new_inner = copy.deepcopy(self._inner, memo)
cls = type(self)
return cls(**new_inner)
# Versioning API
def new_version(self, **kwargs):
unchangable_properties = []
if self.revoked:
raise RevokeError("new_version")
new_obj_inner = copy.deepcopy(self._inner)
properties_to_change = kwargs.keys()
for prop in ["created", "created_by_ref", "id", "type"]:
if prop in properties_to_change:
unchangable_properties.append(prop)
if unchangable_properties:
raise UnmodifiablePropertyError(unchangable_properties)
cls = type(self)
if 'modified' not in kwargs:
kwargs['modified'] = get_timestamp()
else:
new_modified_property = parse_into_datetime(kwargs['modified'])
if new_modified_property < self.modified:
raise InvalidValueError(cls, 'modified', "The new modified datetime cannot be before the current modified datatime.")
new_obj_inner.update(kwargs)
return cls(**new_obj_inner)
def revoke(self):
if self.revoked:
raise RevokeError("revoke")
return self.new_version(revoked=True)

View File

@ -25,7 +25,7 @@ class MissingFieldsError(STIXError, ValueError):
self.fields = sorted(list(fields)) self.fields = sorted(list(fields))
def __str__(self): def __str__(self):
msg = "Missing required field(s) for {0}: ({1})." msg = "No values for required field(s) for {0}: ({1})."
return msg.format(self.cls.__name__, return msg.format(self.cls.__name__,
", ".join(x for x in self.fields)) ", ".join(x for x in self.fields))
@ -49,3 +49,29 @@ class ImmutableError(STIXError, ValueError):
def __init__(self): def __init__(self):
super(ImmutableError, self).__init__("Cannot modify properties after creation.") super(ImmutableError, self).__init__("Cannot modify properties after creation.")
class UnmodifiablePropertyError(STIXError, ValueError):
"""Attempted to modify an unmodifiable property of object when creating a new version"""
def __init__(self, unchangable_properties):
super(UnmodifiablePropertyError, self).__init__()
self.unchangable_properties = unchangable_properties
def __str__(self):
msg = "These properties cannot be changed when making a new version: {0}."
return msg.format(", ".join(self.unchangable_properties))
class RevokeError(STIXError, ValueError):
"""Attempted to an operation on a revoked object"""
def __init__(self, called_by):
super(RevokeError, self).__init__()
self.called_by = called_by
def __str__(self):
if self.called_by == "revoke":
return "Cannot revoke an already revoked object."
else:
return "Cannot create a new version of a revoked object."

View File

@ -39,6 +39,7 @@ RELATIONSHIP_KWARGS = dict(
target_ref=MALWARE_ID, target_ref=MALWARE_ID,
) )
# Minimum required args for a Sighting instance
SIGHTING_KWARGS = dict( SIGHTING_KWARGS = dict(
sighting_of_ref=INDICATOR_ID, sighting_of_ref=INDICATOR_ID,
) )

View File

@ -113,4 +113,3 @@ def test_external_reference_source_required():
assert excinfo.value.cls == stix2.ExternalReference assert excinfo.value.cls == stix2.ExternalReference
assert excinfo.value.fields == ["source_name"] assert excinfo.value.fields == ["source_name"]
assert str(excinfo.value) == "Missing required field(s) for ExternalReference: (source_name)."

View File

@ -93,7 +93,7 @@ def test_indicator_required_fields():
assert excinfo.value.cls == stix2.Indicator assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.fields == ["labels", "pattern"] assert excinfo.value.fields == ["labels", "pattern"]
assert str(excinfo.value) == "Missing required field(s) for Indicator: (labels, pattern)." assert str(excinfo.value) == "No values for required field(s) for Indicator: (labels, pattern)."
def test_indicator_required_field_pattern(): def test_indicator_required_field_pattern():
@ -102,7 +102,6 @@ def test_indicator_required_field_pattern():
assert excinfo.value.cls == stix2.Indicator assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.fields == ["pattern"] assert excinfo.value.fields == ["pattern"]
assert str(excinfo.value) == "Missing required field(s) for Indicator: (pattern)."
def test_indicator_created_ref_invalid_format(): def test_indicator_created_ref_invalid_format():

View File

@ -41,7 +41,6 @@ def test_kill_chain_required_fields():
assert excinfo.value.cls == stix2.KillChainPhase assert excinfo.value.cls == stix2.KillChainPhase
assert excinfo.value.fields == ["kill_chain_name", "phase_name"] assert excinfo.value.fields == ["kill_chain_name", "phase_name"]
assert str(excinfo.value) == "Missing required field(s) for KillChainPhase: (kill_chain_name, phase_name)."
def test_kill_chain_required_field_chain_name(): def test_kill_chain_required_field_chain_name():
@ -51,7 +50,6 @@ def test_kill_chain_required_field_chain_name():
assert excinfo.value.cls == stix2.KillChainPhase assert excinfo.value.cls == stix2.KillChainPhase
assert excinfo.value.fields == ["kill_chain_name"] assert excinfo.value.fields == ["kill_chain_name"]
assert str(excinfo.value) == "Missing required field(s) for KillChainPhase: (kill_chain_name)."
def test_kill_chain_required_field_phase_name(): def test_kill_chain_required_field_phase_name():
@ -61,4 +59,3 @@ def test_kill_chain_required_field_phase_name():
assert excinfo.value.cls == stix2.KillChainPhase assert excinfo.value.cls == stix2.KillChainPhase
assert excinfo.value.fields == ["phase_name"] assert excinfo.value.fields == ["phase_name"]
assert str(excinfo.value) == "Missing required field(s) for KillChainPhase: (phase_name)."

View File

@ -76,7 +76,6 @@ def test_malware_required_fields():
assert excinfo.value.cls == stix2.Malware assert excinfo.value.cls == stix2.Malware
assert excinfo.value.fields == ["labels", "name"] assert excinfo.value.fields == ["labels", "name"]
assert str(excinfo.value) == "Missing required field(s) for Malware: (labels, name)."
def test_malware_required_field_name(): def test_malware_required_field_name():
@ -85,7 +84,6 @@ def test_malware_required_field_name():
assert excinfo.value.cls == stix2.Malware assert excinfo.value.cls == stix2.Malware
assert excinfo.value.fields == ["name"] assert excinfo.value.fields == ["name"]
assert str(excinfo.value) == "Missing required field(s) for Malware: (name)."
def test_cannot_assign_to_malware_attributes(malware): def test_cannot_assign_to_malware_attributes(malware):

View File

@ -76,13 +76,16 @@ def test_relationship_id_must_start_with_relationship():
def test_relationship_required_field_relationship_type(): def test_relationship_required_field_relationship_type():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Relationship() stix2.Relationship()
assert str(excinfo.value) == "Missing required field(s) for Relationship: (relationship_type, source_ref, target_ref)." assert excinfo.value.cls == stix2.Relationship
assert excinfo.value.fields == ["relationship_type", "source_ref", "target_ref"]
def test_relationship_missing_some_required_fields(): def test_relationship_missing_some_required_fields():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Relationship(relationship_type='indicates') stix2.Relationship(relationship_type='indicates')
assert str(excinfo.value) == "Missing required field(s) for Relationship: (source_ref, target_ref)."
assert excinfo.value.cls == stix2.Relationship
assert excinfo.value.fields == ["source_ref", "target_ref"]
def test_relationship_required_field_target_ref(): def test_relationship_required_field_target_ref():
@ -94,7 +97,6 @@ def test_relationship_required_field_target_ref():
assert excinfo.value.cls == stix2.Relationship assert excinfo.value.cls == stix2.Relationship
assert excinfo.value.fields == ["target_ref"] assert excinfo.value.fields == ["target_ref"]
assert str(excinfo.value) == "Missing required field(s) for Relationship: (target_ref)."
def test_cannot_assign_to_relationship_attributes(relationship): def test_cannot_assign_to_relationship_attributes(relationship):

View File

@ -0,0 +1,180 @@
import pytest
import stix2
def test_making_new_version():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v2 = campaign_v1.new_version(name="fred")
assert campaign_v1.id == campaign_v2.id
assert campaign_v1.created_by_ref == campaign_v2.created_by_ref
assert campaign_v1.created == campaign_v2.created
assert campaign_v1.name != campaign_v2.name
assert campaign_v2.name == "fred"
assert campaign_v1.description == campaign_v2.description
assert campaign_v1.modified < campaign_v2.modified
def test_making_new_version_with_unset():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v2 = campaign_v1.new_version(description=None)
assert campaign_v1.id == campaign_v2.id
assert campaign_v1.created_by_ref == campaign_v2.created_by_ref
assert campaign_v1.created == campaign_v2.created
assert campaign_v1.name == campaign_v2.name
assert campaign_v2.description is None
assert campaign_v1.modified < campaign_v2.modified
def test_making_new_version_with_embedded_object():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
external_references=[{
"source_name": "capec",
"external_id": "CAPEC-163"
}],
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v2 = campaign_v1.new_version(external_references=[{
"source_name": "capec",
"external_id": "CAPEC-164"
}])
assert campaign_v1.id == campaign_v2.id
assert campaign_v1.created_by_ref == campaign_v2.created_by_ref
assert campaign_v1.created == campaign_v2.created
assert campaign_v1.name == campaign_v2.name
assert campaign_v1.description == campaign_v2.description
assert campaign_v1.modified < campaign_v2.modified
assert campaign_v1.external_references[0].external_id != campaign_v2.external_references[0].external_id
def test_revoke():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v2 = campaign_v1.revoke()
assert campaign_v1.id == campaign_v2.id
assert campaign_v1.created_by_ref == campaign_v2.created_by_ref
assert campaign_v1.created == campaign_v2.created
assert campaign_v1.name == campaign_v2.name
assert campaign_v1.description == campaign_v2.description
assert campaign_v1.modified < campaign_v2.modified
assert campaign_v2.revoked
def test_versioning_error_invalid_property():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as excinfo:
campaign_v1.new_version(type="threat-actor")
assert str(excinfo.value) == "These properties cannot be changed when making a new version: type."
def test_versioning_error_bad_modified_value():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
campaign_v1.new_version(modified="2015-04-06T20:03:00.000Z")
assert excinfo.value.cls == stix2.Campaign
assert excinfo.value.prop_name == "modified"
assert excinfo.value.reason == "The new modified datetime cannot be before the current modified datatime."
def test_versioning_error_usetting_required_property():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
campaign_v1.new_version(name=None)
assert excinfo.value.cls == stix2.Campaign
assert excinfo.value.fields == ["name"]
def test_versioning_error_new_version_of_revoked():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v2 = campaign_v1.revoke()
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
campaign_v2.new_version(name="barney")
assert excinfo.value.called_by == "new_version"
def test_versioning_error_revoke_of_revoked():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v2 = campaign_v1.revoke()
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
campaign_v2.revoke()
assert excinfo.value.called_by == "revoke"

View File

@ -3,6 +3,7 @@
import datetime as dt import datetime as dt
import json import json
from dateutil import parser
import pytz import pytz
# Sentinel value for fields that should be set to the current time. # Sentinel value for fields that should be set to the current time.
@ -34,6 +35,28 @@ def format_datetime(dttm):
return ts + "Z" return ts + "Z"
def parse_into_datetime(value):
if isinstance(value, dt.date):
if hasattr(value, 'hour'):
return value
else:
# Add a time component
return dt.datetime.combine(value, dt.time(), tzinfo=pytz.utc)
# value isn't a date or datetime object so assume it's a string
try:
parsed = parser.parse(value)
except TypeError:
# Unknown format
raise ValueError("must be a datetime object, date object, or "
"timestamp string in a recognizable format.")
if parsed.tzinfo:
return parsed.astimezone(pytz.utc)
else:
# Doesn't have timezone info in the string; assume UTC
return pytz.utc.localize(parsed)
def get_dict(data): def get_dict(data):
"""Return data as a dictionary. """Return data as a dictionary.
Input can be a dictionary, string, or file-like object. Input can be a dictionary, string, or file-like object.