added unsetting capability

cleaned up MissingFieldsError tests
error when new modified property is earlier than current modified property
stix2.1
Richard Piazza 2017-05-04 16:34:08 -04:00
parent 5b8585b392
commit 200bb8556f
10 changed files with 107 additions and 20 deletions

View File

@ -3,12 +3,13 @@
import collections import collections
import copy import copy
import datetime as dt import datetime as dt
import json import json
from .exceptions import ExtraFieldsError, ImmutableError, InvalidValueError, \ from .exceptions import ExtraFieldsError, ImmutableError, InvalidValueError, \
MissingFieldsError, RevokeError, UnmodifiablePropertyError MissingFieldsError, RevokeError, UnmodifiablePropertyError
from .utils import format_datetime, get_timestamp, NOW from .utils import format_datetime, get_timestamp, NOW, parse_into_datetime
__all__ = ['STIXJSONEncoder', '_STIXBase'] __all__ = ['STIXJSONEncoder', '_STIXBase']
@ -58,16 +59,22 @@ class _STIXBase(collections.Mapping):
if extra_kwargs: if extra_kwargs:
raise ExtraFieldsError(cls, extra_kwargs) raise ExtraFieldsError(cls, extra_kwargs)
# Remove any keyword arguments whose value is None
setting_kwargs = {}
for prop_name, prop_value in kwargs.items():
if prop_value:
setting_kwargs[prop_name] = prop_value
# Detect any missing required fields # Detect any missing required fields
required_fields = get_required_properties(cls._properties) required_fields = get_required_properties(cls._properties)
missing_kwargs = set(required_fields) - set(kwargs) missing_kwargs = set(required_fields) - set(setting_kwargs)
if missing_kwargs: if missing_kwargs:
raise MissingFieldsError(cls, missing_kwargs) raise MissingFieldsError(cls, missing_kwargs)
for prop_name, prop_metadata in cls._properties.items(): for prop_name, prop_metadata in cls._properties.items():
self._check_property(prop_name, prop_metadata, kwargs) self._check_property(prop_name, prop_metadata, setting_kwargs)
self._inner = kwargs self._inner = setting_kwargs
if self.granular_markings: if self.granular_markings:
for m in self.granular_markings: for m in self.granular_markings:
@ -121,10 +128,14 @@ class _STIXBase(collections.Mapping):
unchangable_properties.append(prop) unchangable_properties.append(prop)
if unchangable_properties: if unchangable_properties:
raise UnmodifiablePropertyError(unchangable_properties) raise UnmodifiablePropertyError(unchangable_properties)
cls = type(self)
if 'modified' not in kwargs: if 'modified' not in kwargs:
kwargs['modified'] = get_timestamp() kwargs['modified'] = get_timestamp()
else:
new_modified_property = parse_into_datetime(kwargs['modified'])
if new_modified_property < self.modified:
raise InvalidValueError(cls, 'modified', "The new modified datetime cannot be before the current modified datatime.")
new_obj_inner.update(kwargs) new_obj_inner.update(kwargs)
cls = type(self)
return cls(**new_obj_inner) return cls(**new_obj_inner)
def revoke(self): def revoke(self):

View File

@ -25,7 +25,7 @@ class MissingFieldsError(STIXError, ValueError):
self.fields = sorted(list(fields)) self.fields = sorted(list(fields))
def __str__(self): def __str__(self):
msg = "Missing required field(s) for {0}: ({1})." msg = "No values for required field(s) for {0}: ({1})."
return msg.format(self.cls.__name__, return msg.format(self.cls.__name__,
", ".join(x for x in self.fields)) ", ".join(x for x in self.fields))

View File

@ -39,6 +39,9 @@ RELATIONSHIP_KWARGS = dict(
target_ref=MALWARE_ID, target_ref=MALWARE_ID,
) )
# Minimum required args for a Sighting instance
SIGHTING_KWARGS = dict( SIGHTING_KWARGS = dict(
sighting_of_ref=INDICATOR_ID, sighting_of_ref=INDICATOR_ID,
) )

View File

@ -113,4 +113,3 @@ def test_external_reference_source_required():
assert excinfo.value.cls == stix2.ExternalReference assert excinfo.value.cls == stix2.ExternalReference
assert excinfo.value.fields == ["source_name"] assert excinfo.value.fields == ["source_name"]
assert str(excinfo.value) == "Missing required field(s) for ExternalReference: (source_name)."

View File

@ -93,7 +93,7 @@ def test_indicator_required_fields():
assert excinfo.value.cls == stix2.Indicator assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.fields == ["labels", "pattern"] assert excinfo.value.fields == ["labels", "pattern"]
assert str(excinfo.value) == "Missing required field(s) for Indicator: (labels, pattern)." assert str(excinfo.value) == "No values for required field(s) for Indicator: (labels, pattern)."
def test_indicator_required_field_pattern(): def test_indicator_required_field_pattern():
@ -102,7 +102,6 @@ def test_indicator_required_field_pattern():
assert excinfo.value.cls == stix2.Indicator assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.fields == ["pattern"] assert excinfo.value.fields == ["pattern"]
assert str(excinfo.value) == "Missing required field(s) for Indicator: (pattern)."
def test_indicator_created_ref_invalid_format(): def test_indicator_created_ref_invalid_format():

View File

@ -41,7 +41,6 @@ def test_kill_chain_required_fields():
assert excinfo.value.cls == stix2.KillChainPhase assert excinfo.value.cls == stix2.KillChainPhase
assert excinfo.value.fields == ["kill_chain_name", "phase_name"] assert excinfo.value.fields == ["kill_chain_name", "phase_name"]
assert str(excinfo.value) == "Missing required field(s) for KillChainPhase: (kill_chain_name, phase_name)."
def test_kill_chain_required_field_chain_name(): def test_kill_chain_required_field_chain_name():
@ -51,7 +50,6 @@ def test_kill_chain_required_field_chain_name():
assert excinfo.value.cls == stix2.KillChainPhase assert excinfo.value.cls == stix2.KillChainPhase
assert excinfo.value.fields == ["kill_chain_name"] assert excinfo.value.fields == ["kill_chain_name"]
assert str(excinfo.value) == "Missing required field(s) for KillChainPhase: (kill_chain_name)."
def test_kill_chain_required_field_phase_name(): def test_kill_chain_required_field_phase_name():
@ -61,4 +59,3 @@ def test_kill_chain_required_field_phase_name():
assert excinfo.value.cls == stix2.KillChainPhase assert excinfo.value.cls == stix2.KillChainPhase
assert excinfo.value.fields == ["phase_name"] assert excinfo.value.fields == ["phase_name"]
assert str(excinfo.value) == "Missing required field(s) for KillChainPhase: (phase_name)."

View File

@ -76,7 +76,6 @@ def test_malware_required_fields():
assert excinfo.value.cls == stix2.Malware assert excinfo.value.cls == stix2.Malware
assert excinfo.value.fields == ["labels", "name"] assert excinfo.value.fields == ["labels", "name"]
assert str(excinfo.value) == "Missing required field(s) for Malware: (labels, name)."
def test_malware_required_field_name(): def test_malware_required_field_name():
@ -85,7 +84,6 @@ def test_malware_required_field_name():
assert excinfo.value.cls == stix2.Malware assert excinfo.value.cls == stix2.Malware
assert excinfo.value.fields == ["name"] assert excinfo.value.fields == ["name"]
assert str(excinfo.value) == "Missing required field(s) for Malware: (name)."
def test_cannot_assign_to_malware_attributes(malware): def test_cannot_assign_to_malware_attributes(malware):

View File

@ -76,13 +76,16 @@ def test_relationship_id_must_start_with_relationship():
def test_relationship_required_field_relationship_type(): def test_relationship_required_field_relationship_type():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Relationship() stix2.Relationship()
assert str(excinfo.value) == "Missing required field(s) for Relationship: (relationship_type, source_ref, target_ref)." assert excinfo.value.cls == stix2.Relationship
assert excinfo.value.fields == ["relationship_type", "source_ref", "target_ref"]
def test_relationship_missing_some_required_fields(): def test_relationship_missing_some_required_fields():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Relationship(relationship_type='indicates') stix2.Relationship(relationship_type='indicates')
assert str(excinfo.value) == "Missing required field(s) for Relationship: (source_ref, target_ref)."
assert excinfo.value.cls == stix2.Relationship
assert excinfo.value.fields == ["source_ref", "target_ref"]
def test_relationship_required_field_target_ref(): def test_relationship_required_field_target_ref():
@ -94,7 +97,6 @@ def test_relationship_required_field_target_ref():
assert excinfo.value.cls == stix2.Relationship assert excinfo.value.cls == stix2.Relationship
assert excinfo.value.fields == ["target_ref"] assert excinfo.value.fields == ["target_ref"]
assert str(excinfo.value) == "Missing required field(s) for Relationship: (target_ref)."
def test_cannot_assign_to_relationship_attributes(relationship): def test_cannot_assign_to_relationship_attributes(relationship):

View File

@ -23,6 +23,26 @@ def test_making_new_version():
assert campaign_v1.modified < campaign_v2.modified assert campaign_v1.modified < campaign_v2.modified
def test_making_new_version_with_unset():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
campaign_v2 = campaign_v1.new_version(description=None)
assert campaign_v1.id == campaign_v2.id
assert campaign_v1.created_by_ref == campaign_v2.created_by_ref
assert campaign_v1.created == campaign_v2.created
assert campaign_v1.name == campaign_v2.name
assert campaign_v2.description == None
assert campaign_v1.modified < campaign_v2.modified
def test_making_new_version_with_embedded_object(): def test_making_new_version_with_embedded_object():
campaign_v1 = stix2.Campaign( campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
@ -86,7 +106,42 @@ def test_versioning_error_invalid_property():
with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as excinfo: with pytest.raises(stix2.exceptions.UnmodifiablePropertyError) as excinfo:
campaign_v1.new_version(type="threat-actor") campaign_v1.new_version(type="threat-actor")
str(excinfo.value) == "These properties cannot be changed when making a new version: type" assert str(excinfo.value) == "These properties cannot be changed when making a new version: type."
def test_versioning_error_bad_modified_value():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
campaign_v1.new_version(modified="2015-04-06T20:03:00.000Z")
assert excinfo.value.cls == stix2.Campaign
assert excinfo.value.prop_name == "modified"
assert excinfo.value.reason == "The new modified datetime cannot be before the current modified datatime."
def test_versioning_error_usetting_required_property():
campaign_v1 = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00.000Z",
modified="2016-04-06T20:03:00.000Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector."
)
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
campaign_v1.new_version(name=None)
assert excinfo.value.cls == stix2.Campaign
assert excinfo.value.fields == [ "name" ]
def test_versioning_error_new_version_of_revoked(): def test_versioning_error_new_version_of_revoked():
@ -104,7 +159,7 @@ def test_versioning_error_new_version_of_revoked():
with pytest.raises(stix2.exceptions.RevokeError) as excinfo: with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
campaign_v2.new_version(name="barney") campaign_v2.new_version(name="barney")
str(excinfo.value) == "Cannot create a new version of a revoked object" assert excinfo.value.called_by == "new_version"
def test_versioning_error_revoke_of_revoked(): def test_versioning_error_revoke_of_revoked():
@ -122,4 +177,4 @@ def test_versioning_error_revoke_of_revoked():
with pytest.raises(stix2.exceptions.RevokeError) as excinfo: with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
campaign_v2.revoke() campaign_v2.revoke()
str(excinfo.value) == "Cannot revoke an already revoked object" assert excinfo.value.called_by == "revoke"

View File

@ -1,6 +1,7 @@
"""Utility functions and classes for the stix2 library.""" """Utility functions and classes for the stix2 library."""
import datetime as dt import datetime as dt
from dateutil import parser
import json import json
import pytz import pytz
@ -34,6 +35,28 @@ def format_datetime(dttm):
return ts + "Z" return ts + "Z"
def parse_into_datetime(value):
if isinstance(value, dt.date):
if hasattr(value, 'hour'):
return value
else:
# Add a time component
return dt.datetime.combine(value, dt.time(), tzinfo=pytz.utc)
# value isn't a date or datetime object so assume it's a string
try:
parsed = parser.parse(value)
except TypeError:
# Unknown format
raise ValueError("must be a datetime object, date object, or "
"timestamp string in a recognizable format.")
if parsed.tzinfo:
return parsed.astimezone(pytz.utc)
else:
# Doesn't have timezone info in the string; assume UTC
return pytz.utc.localize(parsed)
def get_dict(data): def get_dict(data):
"""Return data as a dictionary. """Return data as a dictionary.
Input can be a dictionary, string, or file-like object. Input can be a dictionary, string, or file-like object.