parent
4aa69fa7c9
commit
a9ac7ce838
|
@ -331,7 +331,7 @@ class TimestampProperty(Property):
|
||||||
|
|
||||||
def clean(self, value):
|
def clean(self, value):
|
||||||
return parse_into_datetime(
|
return parse_into_datetime(
|
||||||
value, self.precision, self.precision_constraint
|
value, self.precision, self.precision_constraint,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
import datetime
|
import datetime
|
||||||
import pytest
|
|
||||||
from stix2.utils import (
|
|
||||||
Precision, PrecisionConstraint, _to_enum, parse_into_datetime,
|
|
||||||
format_datetime, STIXdatetime
|
|
||||||
)
|
|
||||||
import stix2
|
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import stix2
|
||||||
|
from stix2.utils import (
|
||||||
|
Precision, PrecisionConstraint, STIXdatetime, _to_enum, format_datetime,
|
||||||
|
parse_into_datetime,
|
||||||
|
)
|
||||||
|
|
||||||
_DT = datetime.datetime.utcnow()
|
_DT = datetime.datetime.utcnow()
|
||||||
# intentionally omit microseconds from the following. We add it in as
|
# intentionally omit microseconds from the following. We add it in as
|
||||||
|
@ -13,24 +14,30 @@ _DT = datetime.datetime.utcnow()
|
||||||
_DT_STR = _DT.strftime("%Y-%m-%dT%H:%M:%S")
|
_DT_STR = _DT.strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("value, enum_type, enum_default, enum_expected", [
|
@pytest.mark.parametrize(
|
||||||
("second", Precision, None, Precision.SECOND),
|
"value, enum_type, enum_default, enum_expected", [
|
||||||
("eXaCt", PrecisionConstraint, PrecisionConstraint.MIN,
|
("second", Precision, None, Precision.SECOND),
|
||||||
PrecisionConstraint.EXACT),
|
(
|
||||||
(None, Precision, Precision.MILLISECOND, Precision.MILLISECOND),
|
"eXaCt", PrecisionConstraint, PrecisionConstraint.MIN,
|
||||||
(Precision.ANY, Precision, None, Precision.ANY)
|
PrecisionConstraint.EXACT
|
||||||
])
|
),
|
||||||
|
(None, Precision, Precision.MILLISECOND, Precision.MILLISECOND),
|
||||||
|
(Precision.ANY, Precision, None, Precision.ANY),
|
||||||
|
],
|
||||||
|
)
|
||||||
def test_to_enum(value, enum_type, enum_default, enum_expected):
|
def test_to_enum(value, enum_type, enum_default, enum_expected):
|
||||||
result = _to_enum(value, enum_type, enum_default)
|
result = _to_enum(value, enum_type, enum_default)
|
||||||
assert result == enum_expected
|
assert result == enum_expected
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("value, err_type", [
|
@pytest.mark.parametrize(
|
||||||
("foo", KeyError),
|
"value, err_type", [
|
||||||
(1, TypeError),
|
("foo", KeyError),
|
||||||
(PrecisionConstraint.EXACT, TypeError),
|
(1, TypeError),
|
||||||
(None, TypeError)
|
(PrecisionConstraint.EXACT, TypeError),
|
||||||
])
|
(None, TypeError),
|
||||||
|
],
|
||||||
|
)
|
||||||
def test_to_enum_errors(value, err_type):
|
def test_to_enum_errors(value, err_type):
|
||||||
with pytest.raises(err_type):
|
with pytest.raises(err_type):
|
||||||
_to_enum(value, Precision)
|
_to_enum(value, Precision)
|
||||||
|
@ -47,7 +54,7 @@ def test_stix_datetime():
|
||||||
|
|
||||||
sdt = STIXdatetime(
|
sdt = STIXdatetime(
|
||||||
dt,
|
dt,
|
||||||
precision_constraint=PrecisionConstraint.EXACT
|
precision_constraint=PrecisionConstraint.EXACT,
|
||||||
)
|
)
|
||||||
assert sdt.precision_constraint is PrecisionConstraint.EXACT
|
assert sdt.precision_constraint is PrecisionConstraint.EXACT
|
||||||
assert sdt == dt
|
assert sdt == dt
|
||||||
|
@ -61,10 +68,11 @@ def test_stix_datetime():
|
||||||
(123456, Precision.MILLISECOND, PrecisionConstraint.EXACT, 123000),
|
(123456, Precision.MILLISECOND, PrecisionConstraint.EXACT, 123000),
|
||||||
(123456, Precision.MILLISECOND, PrecisionConstraint.MIN, 123456),
|
(123456, Precision.MILLISECOND, PrecisionConstraint.MIN, 123456),
|
||||||
(1234, Precision.MILLISECOND, PrecisionConstraint.EXACT, 1000),
|
(1234, Precision.MILLISECOND, PrecisionConstraint.EXACT, 1000),
|
||||||
(123, Precision.MILLISECOND, PrecisionConstraint.EXACT, 0)
|
(123, Precision.MILLISECOND, PrecisionConstraint.EXACT, 0),
|
||||||
])
|
],
|
||||||
|
)
|
||||||
def test_parse_datetime(
|
def test_parse_datetime(
|
||||||
us, precision, precision_constraint, expected_truncated_us
|
us, precision, precision_constraint, expected_truncated_us,
|
||||||
):
|
):
|
||||||
|
|
||||||
# complete the datetime string with microseconds
|
# complete the datetime string with microseconds
|
||||||
|
@ -73,7 +81,7 @@ def test_parse_datetime(
|
||||||
sdt = parse_into_datetime(
|
sdt = parse_into_datetime(
|
||||||
dt_us_str,
|
dt_us_str,
|
||||||
precision=precision,
|
precision=precision,
|
||||||
precision_constraint=precision_constraint
|
precision_constraint=precision_constraint,
|
||||||
)
|
)
|
||||||
|
|
||||||
assert sdt.precision is precision
|
assert sdt.precision is precision
|
||||||
|
@ -100,7 +108,8 @@ def test_parse_datetime(
|
||||||
(1001, Precision.MILLISECOND, PrecisionConstraint.MIN, ".001001"),
|
(1001, Precision.MILLISECOND, PrecisionConstraint.MIN, ".001001"),
|
||||||
(10010, Precision.MILLISECOND, PrecisionConstraint.MIN, ".01001"),
|
(10010, Precision.MILLISECOND, PrecisionConstraint.MIN, ".01001"),
|
||||||
(100100, Precision.MILLISECOND, PrecisionConstraint.MIN, ".1001"),
|
(100100, Precision.MILLISECOND, PrecisionConstraint.MIN, ".1001"),
|
||||||
])
|
],
|
||||||
|
)
|
||||||
def test_format_datetime(us, precision, precision_constraint, expected_us_str):
|
def test_format_datetime(us, precision, precision_constraint, expected_us_str):
|
||||||
|
|
||||||
dt = _DT.replace(microsecond=us)
|
dt = _DT.replace(microsecond=us)
|
||||||
|
@ -109,7 +118,7 @@ def test_format_datetime(us, precision, precision_constraint, expected_us_str):
|
||||||
sdt = STIXdatetime(
|
sdt = STIXdatetime(
|
||||||
dt,
|
dt,
|
||||||
precision=precision,
|
precision=precision,
|
||||||
precision_constraint=precision_constraint
|
precision_constraint=precision_constraint,
|
||||||
)
|
)
|
||||||
s = format_datetime(sdt)
|
s = format_datetime(sdt)
|
||||||
assert s == expected_dt_str
|
assert s == expected_dt_str
|
||||||
|
@ -124,7 +133,7 @@ def test_sdo_extra_precision():
|
||||||
"modified" :"2015-12-21T19:59:11.0001Z",
|
"modified" :"2015-12-21T19:59:11.0001Z",
|
||||||
"name" :"John Smith",
|
"name" :"John Smith",
|
||||||
"identity_class" :"individual",
|
"identity_class" :"individual",
|
||||||
"spec_version": "2.1"
|
"spec_version": "2.1",
|
||||||
}
|
}
|
||||||
|
|
||||||
identity_obj = stix2.parse(identity_dict)
|
identity_obj = stix2.parse(identity_dict)
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import stix2
|
import stix2
|
||||||
|
@ -267,28 +268,30 @@ def test_remove_custom_stix_no_custom():
|
||||||
assert campaign_v1.description == campaign_v2.description
|
assert campaign_v1.description == campaign_v2.description
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("old, candidate_new, expected_new, use_stix21", [
|
@pytest.mark.parametrize(
|
||||||
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.001Z", "1999-08-15T00:19:07.001Z", False),
|
"old, candidate_new, expected_new, use_stix21", [
|
||||||
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.001Z", False),
|
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.001Z", "1999-08-15T00:19:07.001Z", False),
|
||||||
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:06.000Z", "1999-08-15T00:19:07.001Z", False),
|
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.001Z", False),
|
||||||
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:06.999Z", "1999-08-15T00:19:07.001Z", False),
|
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:06.000Z", "1999-08-15T00:19:07.001Z", False),
|
||||||
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.0001Z", "1999-08-15T00:19:07.001Z", False),
|
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:06.999Z", "1999-08-15T00:19:07.001Z", False),
|
||||||
("1999-08-15T00:19:07.999Z", "1999-08-15T00:19:07.9999Z", "1999-08-15T00:19:08.000Z", False),
|
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.0001Z", "1999-08-15T00:19:07.001Z", False),
|
||||||
|
("1999-08-15T00:19:07.999Z", "1999-08-15T00:19:07.9999Z", "1999-08-15T00:19:08.000Z", False),
|
||||||
|
|
||||||
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.001Z", "1999-08-15T00:19:07.001Z", True),
|
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.001Z", "1999-08-15T00:19:07.001Z", True),
|
||||||
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.000001Z", True),
|
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.000001Z", True),
|
||||||
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:06.000Z", "1999-08-15T00:19:07.000001Z", True),
|
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:06.000Z", "1999-08-15T00:19:07.000001Z", True),
|
||||||
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:06.999999Z", "1999-08-15T00:19:07.000001Z", True),
|
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:06.999999Z", "1999-08-15T00:19:07.000001Z", True),
|
||||||
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.000001Z", "1999-08-15T00:19:07.000001Z", True),
|
("1999-08-15T00:19:07.000Z", "1999-08-15T00:19:07.000001Z", "1999-08-15T00:19:07.000001Z", True),
|
||||||
("1999-08-15T00:19:07.999Z", "1999-08-15T00:19:07.999999Z", "1999-08-15T00:19:07.999999Z", True),
|
("1999-08-15T00:19:07.999Z", "1999-08-15T00:19:07.999999Z", "1999-08-15T00:19:07.999999Z", True),
|
||||||
])
|
],
|
||||||
|
)
|
||||||
def test_fudge_modified(old, candidate_new, expected_new, use_stix21):
|
def test_fudge_modified(old, candidate_new, expected_new, use_stix21):
|
||||||
old_dt = datetime.datetime.strptime(old, "%Y-%m-%dT%H:%M:%S.%fZ")
|
old_dt = datetime.datetime.strptime(old, "%Y-%m-%dT%H:%M:%S.%fZ")
|
||||||
candidate_new_dt = datetime.datetime.strptime(
|
candidate_new_dt = datetime.datetime.strptime(
|
||||||
candidate_new, "%Y-%m-%dT%H:%M:%S.%fZ"
|
candidate_new, "%Y-%m-%dT%H:%M:%S.%fZ",
|
||||||
)
|
)
|
||||||
expected_new_dt = datetime.datetime.strptime(
|
expected_new_dt = datetime.datetime.strptime(
|
||||||
expected_new, "%Y-%m-%dT%H:%M:%S.%fZ"
|
expected_new, "%Y-%m-%dT%H:%M:%S.%fZ",
|
||||||
)
|
)
|
||||||
|
|
||||||
fudged = stix2.utils._fudge_modified(old_dt, candidate_new_dt, use_stix21)
|
fudged = stix2.utils._fudge_modified(old_dt, candidate_new_dt, use_stix21)
|
||||||
|
|
|
@ -8,10 +8,10 @@ import copy
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
import enum
|
import enum
|
||||||
import json
|
import json
|
||||||
import six
|
|
||||||
|
|
||||||
from dateutil import parser
|
from dateutil import parser
|
||||||
import pytz
|
import pytz
|
||||||
|
import six
|
||||||
|
|
||||||
import stix2.base
|
import stix2.base
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ def _to_enum(value, enum_type, enum_default=None):
|
||||||
value = enum_type[value.upper()]
|
value = enum_type[value.upper()]
|
||||||
else:
|
else:
|
||||||
raise TypeError("Not a valid {}: {}".format(
|
raise TypeError("Not a valid {}: {}".format(
|
||||||
enum_type.__name__, value
|
enum_type.__name__, value,
|
||||||
))
|
))
|
||||||
|
|
||||||
return value
|
return value
|
||||||
|
@ -97,11 +97,11 @@ class STIXdatetime(dt.datetime):
|
||||||
def __new__(cls, *args, **kwargs):
|
def __new__(cls, *args, **kwargs):
|
||||||
precision = _to_enum(
|
precision = _to_enum(
|
||||||
kwargs.pop("precision", Precision.ANY),
|
kwargs.pop("precision", Precision.ANY),
|
||||||
Precision
|
Precision,
|
||||||
)
|
)
|
||||||
precision_constraint = _to_enum(
|
precision_constraint = _to_enum(
|
||||||
kwargs.pop("precision_constraint", PrecisionConstraint.EXACT),
|
kwargs.pop("precision_constraint", PrecisionConstraint.EXACT),
|
||||||
PrecisionConstraint
|
PrecisionConstraint,
|
||||||
)
|
)
|
||||||
|
|
||||||
if isinstance(args[0], dt.datetime): # Allow passing in a datetime object
|
if isinstance(args[0], dt.datetime): # Allow passing in a datetime object
|
||||||
|
@ -176,7 +176,7 @@ def format_datetime(dttm):
|
||||||
ts = zoned.strftime('%Y-%m-%dT%H:%M:%S')
|
ts = zoned.strftime('%Y-%m-%dT%H:%M:%S')
|
||||||
precision = getattr(dttm, 'precision', Precision.ANY)
|
precision = getattr(dttm, 'precision', Precision.ANY)
|
||||||
precision_constraint = getattr(
|
precision_constraint = getattr(
|
||||||
dttm, 'precision_constraint', PrecisionConstraint.EXACT
|
dttm, 'precision_constraint', PrecisionConstraint.EXACT,
|
||||||
)
|
)
|
||||||
|
|
||||||
frac_seconds_str = ""
|
frac_seconds_str = ""
|
||||||
|
@ -212,7 +212,7 @@ def format_datetime(dttm):
|
||||||
ts = "{}{}{}Z".format(
|
ts = "{}{}{}Z".format(
|
||||||
ts,
|
ts,
|
||||||
"." if frac_seconds_str else "",
|
"." if frac_seconds_str else "",
|
||||||
frac_seconds_str
|
frac_seconds_str,
|
||||||
)
|
)
|
||||||
|
|
||||||
return ts
|
return ts
|
||||||
|
@ -220,7 +220,7 @@ def format_datetime(dttm):
|
||||||
|
|
||||||
def parse_into_datetime(
|
def parse_into_datetime(
|
||||||
value, precision=Precision.ANY,
|
value, precision=Precision.ANY,
|
||||||
precision_constraint=PrecisionConstraint.EXACT
|
precision_constraint=PrecisionConstraint.EXACT,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Parse a value into a valid STIX timestamp object. Also, optionally adjust
|
Parse a value into a valid STIX timestamp object. Also, optionally adjust
|
||||||
|
@ -280,7 +280,7 @@ def parse_into_datetime(
|
||||||
# else, precision == Precision.ANY: nothing for us to do.
|
# else, precision == Precision.ANY: nothing for us to do.
|
||||||
|
|
||||||
return STIXdatetime(
|
return STIXdatetime(
|
||||||
ts, precision=precision, precision_constraint=precision_constraint
|
ts, precision=precision, precision_constraint=precision_constraint,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -455,7 +455,7 @@ def new_version(data, **kwargs):
|
||||||
if 'modified' not in kwargs:
|
if 'modified' not in kwargs:
|
||||||
old_modified = parse_into_datetime(
|
old_modified = parse_into_datetime(
|
||||||
data["modified"], precision="millisecond",
|
data["modified"], precision="millisecond",
|
||||||
precision_constraint=precision_constraint
|
precision_constraint=precision_constraint,
|
||||||
)
|
)
|
||||||
|
|
||||||
new_modified = get_timestamp()
|
new_modified = get_timestamp()
|
||||||
|
@ -466,11 +466,11 @@ def new_version(data, **kwargs):
|
||||||
elif 'modified' in data:
|
elif 'modified' in data:
|
||||||
old_modified_property = parse_into_datetime(
|
old_modified_property = parse_into_datetime(
|
||||||
data.get('modified'), precision='millisecond',
|
data.get('modified'), precision='millisecond',
|
||||||
precision_constraint=precision_constraint
|
precision_constraint=precision_constraint,
|
||||||
)
|
)
|
||||||
new_modified_property = parse_into_datetime(
|
new_modified_property = parse_into_datetime(
|
||||||
kwargs['modified'], precision='millisecond',
|
kwargs['modified'], precision='millisecond',
|
||||||
precision_constraint=precision_constraint
|
precision_constraint=precision_constraint,
|
||||||
)
|
)
|
||||||
if new_modified_property <= old_modified_property:
|
if new_modified_property <= old_modified_property:
|
||||||
raise InvalidValueError(
|
raise InvalidValueError(
|
||||||
|
|
Loading…
Reference in New Issue