diff --git a/stix2/base.py b/stix2/base.py index 05bf545..1c90dab 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -1,12 +1,15 @@ """Base class for type definitions in the stix2 library.""" import collections +import copy import datetime as dt + import json + from .exceptions import ExtraFieldsError, ImmutableError, InvalidValueError, \ - MissingFieldsError -from .utils import format_datetime, get_timestamp, NOW + MissingFieldsError, RevokeError, UnmodifiablePropertyError +from .utils import format_datetime, get_timestamp, NOW, parse_into_datetime __all__ = ['STIXJSONEncoder', '_STIXBase'] @@ -56,16 +59,22 @@ class _STIXBase(collections.Mapping): if extra_kwargs: raise ExtraFieldsError(cls, extra_kwargs) + # Remove any keyword arguments whose value is None + setting_kwargs = {} + for prop_name, prop_value in kwargs.items(): + if prop_value: + setting_kwargs[prop_name] = prop_value + # Detect any missing required fields required_fields = get_required_properties(cls._properties) - missing_kwargs = set(required_fields) - set(kwargs) + missing_kwargs = set(required_fields) - set(setting_kwargs) if missing_kwargs: raise MissingFieldsError(cls, missing_kwargs) for prop_name, prop_metadata in cls._properties.items(): - self._check_property(prop_name, prop_metadata, kwargs) + self._check_property(prop_name, prop_metadata, setting_kwargs) - self._inner = kwargs + self._inner = setting_kwargs if self.granular_markings: for m in self.granular_markings: @@ -99,3 +108,37 @@ class _STIXBase(collections.Mapping): props = [(k, self[k]) for k in sorted(self._properties) if self.get(k)] return "{0}({1})".format(self.__class__.__name__, ", ".join(["{0!s}={1!r}".format(k, v) for k, v in props])) + + def __deepcopy__(self, memo): + # Assumption: we can ignore the memo argument, because no object will ever contain the same sub-object multiple times. + new_inner = copy.deepcopy(self._inner, memo) + cls = type(self) + return cls(**new_inner) + +# Versioning API + + def new_version(self, **kwargs): + unchangable_properties = [] + if self.revoked: + raise RevokeError("new_version") + new_obj_inner = copy.deepcopy(self._inner) + properties_to_change = kwargs.keys() + for prop in ["created", "created_by_ref", "id", "type"]: + if prop in properties_to_change: + unchangable_properties.append(prop) + if unchangable_properties: + raise UnmodifiablePropertyError(unchangable_properties) + cls = type(self) + if 'modified' not in kwargs: + kwargs['modified'] = get_timestamp() + else: + new_modified_property = parse_into_datetime(kwargs['modified']) + if new_modified_property < self.modified: + raise InvalidValueError(cls, 'modified', "The new modified datetime cannot be before the current modified datatime.") + new_obj_inner.update(kwargs) + return cls(**new_obj_inner) + + def revoke(self): + if self.revoked: + raise RevokeError("revoke") + return self.new_version(revoked=True) diff --git a/stix2/exceptions.py b/stix2/exceptions.py index 2606796..20e44a7 100644 --- a/stix2/exceptions.py +++ b/stix2/exceptions.py @@ -25,7 +25,7 @@ class MissingFieldsError(STIXError, ValueError): self.fields = sorted(list(fields)) def __str__(self): - msg = "Missing required field(s) for {0}: ({1})." + msg = "No values for required field(s) for {0}: ({1})." return msg.format(self.cls.__name__, ", ".join(x for x in self.fields)) @@ -49,3 +49,29 @@ class ImmutableError(STIXError, ValueError): def __init__(self): super(ImmutableError, self).__init__("Cannot modify properties after creation.") + + +class UnmodifiablePropertyError(STIXError, ValueError): + """Attempted to modify an unmodifiable property of object when creating a new version""" + + def __init__(self, unchangable_properties): + super(UnmodifiablePropertyError, self).__init__() + self.unchangable_properties = unchangable_properties + + def __str__(self): + msg = "These properties cannot be changed when making a new version: {0}." + return msg.format(", ".join(self.unchangable_properties)) + + +class RevokeError(STIXError, ValueError): + """Attempted to an operation on a revoked object""" + + def __init__(self, called_by): + super(RevokeError, self).__init__() + self.called_by = called_by + + def __str__(self): + if self.called_by == "revoke": + return "Cannot revoke an already revoked object." + else: + return "Cannot create a new version of a revoked object." diff --git a/stix2/test/constants.py b/stix2/test/constants.py index 1c8ae2b..b631d08 100644 --- a/stix2/test/constants.py +++ b/stix2/test/constants.py @@ -39,6 +39,7 @@ RELATIONSHIP_KWARGS = dict( target_ref=MALWARE_ID, ) +# Minimum required args for a Sighting instance SIGHTING_KWARGS = dict( sighting_of_ref=INDICATOR_ID, ) diff --git a/stix2/test/test_external_reference.py b/stix2/test/test_external_reference.py index f8d9b66..1e25fd2 100644 --- a/stix2/test/test_external_reference.py +++ b/stix2/test/test_external_reference.py @@ -113,4 +113,3 @@ def test_external_reference_source_required(): assert excinfo.value.cls == stix2.ExternalReference assert excinfo.value.fields == ["source_name"] - assert str(excinfo.value) == "Missing required field(s) for ExternalReference: (source_name)." diff --git a/stix2/test/test_indicator.py b/stix2/test/test_indicator.py index 1c2b610..baf4b38 100644 --- a/stix2/test/test_indicator.py +++ b/stix2/test/test_indicator.py @@ -93,7 +93,7 @@ def test_indicator_required_fields(): assert excinfo.value.cls == stix2.Indicator assert excinfo.value.fields == ["labels", "pattern"] - assert str(excinfo.value) == "Missing required field(s) for Indicator: (labels, pattern)." + assert str(excinfo.value) == "No values for required field(s) for Indicator: (labels, pattern)." def test_indicator_required_field_pattern(): @@ -102,7 +102,6 @@ def test_indicator_required_field_pattern(): assert excinfo.value.cls == stix2.Indicator assert excinfo.value.fields == ["pattern"] - assert str(excinfo.value) == "Missing required field(s) for Indicator: (pattern)." def test_indicator_created_ref_invalid_format(): diff --git a/stix2/test/test_kill_chain_phases.py b/stix2/test/test_kill_chain_phases.py index d2ecc1f..037e930 100644 --- a/stix2/test/test_kill_chain_phases.py +++ b/stix2/test/test_kill_chain_phases.py @@ -41,7 +41,6 @@ def test_kill_chain_required_fields(): assert excinfo.value.cls == stix2.KillChainPhase assert excinfo.value.fields == ["kill_chain_name", "phase_name"] - assert str(excinfo.value) == "Missing required field(s) for KillChainPhase: (kill_chain_name, phase_name)." def test_kill_chain_required_field_chain_name(): @@ -51,7 +50,6 @@ def test_kill_chain_required_field_chain_name(): assert excinfo.value.cls == stix2.KillChainPhase assert excinfo.value.fields == ["kill_chain_name"] - assert str(excinfo.value) == "Missing required field(s) for KillChainPhase: (kill_chain_name)." def test_kill_chain_required_field_phase_name(): @@ -61,4 +59,3 @@ def test_kill_chain_required_field_phase_name(): assert excinfo.value.cls == stix2.KillChainPhase assert excinfo.value.fields == ["phase_name"] - assert str(excinfo.value) == "Missing required field(s) for KillChainPhase: (phase_name)." diff --git a/stix2/test/test_malware.py b/stix2/test/test_malware.py index edc35b6..f3bfccd 100644 --- a/stix2/test/test_malware.py +++ b/stix2/test/test_malware.py @@ -76,7 +76,6 @@ def test_malware_required_fields(): assert excinfo.value.cls == stix2.Malware assert excinfo.value.fields == ["labels", "name"] - assert str(excinfo.value) == "Missing required field(s) for Malware: (labels, name)." def test_malware_required_field_name(): @@ -85,7 +84,6 @@ def test_malware_required_field_name(): assert excinfo.value.cls == stix2.Malware assert excinfo.value.fields == ["name"] - assert str(excinfo.value) == "Missing required field(s) for Malware: (name)." def test_cannot_assign_to_malware_attributes(malware): diff --git a/stix2/test/test_relationship.py b/stix2/test/test_relationship.py index 72e659c..869107d 100644 --- a/stix2/test/test_relationship.py +++ b/stix2/test/test_relationship.py @@ -76,13 +76,16 @@ def test_relationship_id_must_start_with_relationship(): def test_relationship_required_field_relationship_type(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.Relationship() - assert str(excinfo.value) == "Missing required field(s) for Relationship: (relationship_type, source_ref, target_ref)." + assert excinfo.value.cls == stix2.Relationship + assert excinfo.value.fields == ["relationship_type", "source_ref", "target_ref"] def test_relationship_missing_some_required_fields(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.Relationship(relationship_type='indicates') - assert str(excinfo.value) == "Missing required field(s) for Relationship: (source_ref, target_ref)." + + assert excinfo.value.cls == stix2.Relationship + assert excinfo.value.fields == ["source_ref", "target_ref"] def test_relationship_required_field_target_ref(): @@ -94,7 +97,6 @@ def test_relationship_required_field_target_ref(): assert excinfo.value.cls == stix2.Relationship assert excinfo.value.fields == ["target_ref"] - assert str(excinfo.value) == "Missing required field(s) for Relationship: (target_ref)." def test_cannot_assign_to_relationship_attributes(relationship): diff --git a/stix2/test/test_versioning.py b/stix2/test/test_versioning.py new file mode 100644 index 0000000..4663263 --- /dev/null +++ b/stix2/test/test_versioning.py @@ -0,0 +1,180 @@ +import pytest +import stix2 + + +def test_making_new_version(): + campaign_v1 = stix2.Campaign( + id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector." + ) + + campaign_v2 = campaign_v1.new_version(name="fred") + + assert campaign_v1.id == campaign_v2.id + assert campaign_v1.created_by_ref == campaign_v2.created_by_ref + assert campaign_v1.created == campaign_v2.created + assert campaign_v1.name != campaign_v2.name + assert campaign_v2.name == "fred" + assert campaign_v1.description == campaign_v2.description + assert campaign_v1.modified < campaign_v2.modified + + +def test_making_new_version_with_unset(): + campaign_v1 = stix2.Campaign( + id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector." + ) + + campaign_v2 = campaign_v1.new_version(description=None) + + assert campaign_v1.id == campaign_v2.id + assert campaign_v1.created_by_ref == campaign_v2.created_by_ref + assert campaign_v1.created == campaign_v2.created + assert campaign_v1.name == campaign_v2.name + assert campaign_v2.description is None + assert campaign_v1.modified < campaign_v2.modified + + +def test_making_new_version_with_embedded_object(): + campaign_v1 = stix2.Campaign( + id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + external_references=[{ + "source_name": "capec", + "external_id": "CAPEC-163" + }], + description="Campaign by Green Group against a series of targets in the financial services sector." + ) + + campaign_v2 = campaign_v1.new_version(external_references=[{ + "source_name": "capec", + "external_id": "CAPEC-164" + }]) + + assert campaign_v1.id == campaign_v2.id + assert campaign_v1.created_by_ref == campaign_v2.created_by_ref + assert campaign_v1.created == campaign_v2.created + assert campaign_v1.name == campaign_v2.name + assert campaign_v1.description == campaign_v2.description + assert campaign_v1.modified < campaign_v2.modified + assert campaign_v1.external_references[0].external_id != campaign_v2.external_references[0].external_id + + +def test_revoke(): + campaign_v1 = stix2.Campaign( + id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector." + ) + + campaign_v2 = campaign_v1.revoke() + + assert campaign_v1.id == campaign_v2.id + assert campaign_v1.created_by_ref == campaign_v2.created_by_ref + assert campaign_v1.created == campaign_v2.created + assert campaign_v1.name == campaign_v2.name + assert campaign_v1.description == campaign_v2.description + assert campaign_v1.modified < campaign_v2.modified + + assert campaign_v2.revoked + + +def test_versioning_error_invalid_property(): + campaign_v1 = stix2.Campaign( + id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector." + ) + + with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as excinfo: + campaign_v1.new_version(type="threat-actor") + + assert str(excinfo.value) == "These properties cannot be changed when making a new version: type." + + +def test_versioning_error_bad_modified_value(): + campaign_v1 = stix2.Campaign( + id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector." + ) + + with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: + campaign_v1.new_version(modified="2015-04-06T20:03:00.000Z") + + assert excinfo.value.cls == stix2.Campaign + assert excinfo.value.prop_name == "modified" + assert excinfo.value.reason == "The new modified datetime cannot be before the current modified datatime." + + +def test_versioning_error_usetting_required_property(): + campaign_v1 = stix2.Campaign( + id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector." + ) + + with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: + campaign_v1.new_version(name=None) + + assert excinfo.value.cls == stix2.Campaign + assert excinfo.value.fields == ["name"] + + +def test_versioning_error_new_version_of_revoked(): + campaign_v1 = stix2.Campaign( + id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector." + ) + + campaign_v2 = campaign_v1.revoke() + + with pytest.raises(stix2.exceptions.RevokeError) as excinfo: + campaign_v2.new_version(name="barney") + + assert excinfo.value.called_by == "new_version" + + +def test_versioning_error_revoke_of_revoked(): + campaign_v1 = stix2.Campaign( + id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T20:03:00.000Z", + modified="2016-04-06T20:03:00.000Z", + name="Green Group Attacks Against Finance", + description="Campaign by Green Group against a series of targets in the financial services sector." + ) + + campaign_v2 = campaign_v1.revoke() + + with pytest.raises(stix2.exceptions.RevokeError) as excinfo: + campaign_v2.revoke() + + assert excinfo.value.called_by == "revoke" diff --git a/stix2/utils.py b/stix2/utils.py index cdc7f54..a2493e2 100644 --- a/stix2/utils.py +++ b/stix2/utils.py @@ -3,6 +3,7 @@ import datetime as dt import json +from dateutil import parser import pytz # Sentinel value for fields that should be set to the current time. @@ -34,6 +35,28 @@ def format_datetime(dttm): return ts + "Z" +def parse_into_datetime(value): + if isinstance(value, dt.date): + if hasattr(value, 'hour'): + return value + else: + # Add a time component + return dt.datetime.combine(value, dt.time(), tzinfo=pytz.utc) + + # value isn't a date or datetime object so assume it's a string + try: + parsed = parser.parse(value) + except TypeError: + # Unknown format + raise ValueError("must be a datetime object, date object, or " + "timestamp string in a recognizable format.") + if parsed.tzinfo: + return parsed.astimezone(pytz.utc) + else: + # Doesn't have timezone info in the string; assume UTC + return pytz.utc.localize(parsed) + + def get_dict(data): """Return data as a dictionary. Input can be a dictionary, string, or file-like object.