cti-python-stix2/stix2/utils.py

247 lines
8.4 KiB
Python
Raw Normal View History

2017-02-10 22:35:02 +01:00
"""Utility functions and classes for the stix2 library."""
from collections import Mapping
import copy
2017-02-10 22:35:02 +01:00
import datetime as dt
2017-05-05 16:53:28 +02:00
import json
from dateutil import parser
import pytz
from .exceptions import (InvalidValueError, RevokeError,
UnmodifiablePropertyError)
# Sentinel value for properties that should be set to the current time.
2017-02-10 22:35:02 +01:00
# We can't use the standard 'default' approach, since if there are multiple
# timestamps in a single object, the timestamps will vary by a few microseconds.
NOW = object()
class STIXdatetime(dt.datetime):
def __new__(cls, *args, **kwargs):
precision = kwargs.pop('precision', None)
if isinstance(args[0], dt.datetime): # Allow passing in a datetime object
dttm = args[0]
args = (dttm.year, dttm.month, dttm.day, dttm.hour, dttm.minute,
dttm.second, dttm.microsecond, dttm.tzinfo)
# self will be an instance of STIXdatetime, not dt.datetime
self = dt.datetime.__new__(cls, *args, **kwargs)
self.precision = precision
return self
def __repr__(self):
return "'%s'" % format_datetime(self)
def deduplicate(stix_obj_list):
"""Deduplicate a list of STIX objects to a unique set
Reduces a set of STIX objects to unique set by looking
at 'id' and 'modified' fields - as a unique object version
is determined by the combination of those fields
Note: Be aware, as can be seen in the implementation
of deduplicate(),that if the "stix_obj_list" argument has
multiple STIX objects of the same version, the last object
version found in the list will be the one that is returned.
()
Args:
stix_obj_list (list): list of STIX objects (dicts)
Returns:
A list with a unique set of the passed list of STIX objects.
"""
unique_objs = {}
for obj in stix_obj_list:
unique_objs[(obj['id'], obj['modified'])] = obj
return list(unique_objs.values())
2017-02-10 22:35:02 +01:00
def get_timestamp():
"""Return a STIX timestamp of the current date and time."""
return STIXdatetime.now(tz=pytz.UTC)
2017-02-10 22:35:02 +01:00
def format_datetime(dttm):
"""Convert a datetime object into a valid STIX timestamp string.
1. Convert to timezone-aware
2. Convert to UTC
3. Format in ISO format
4. Ensure correct precision
a. Add subsecond value if non-zero and precision not defined
5. Add "Z"
"""
if dttm.tzinfo is None or dttm.tzinfo.utcoffset(dttm) is None:
# dttm is timezone-naive; assume UTC
zoned = pytz.utc.localize(dttm)
else:
zoned = dttm.astimezone(pytz.utc)
ts = zoned.strftime("%Y-%m-%dT%H:%M:%S")
ms = zoned.strftime("%f")
precision = getattr(dttm, "precision", None)
if precision == 'second':
pass # Alredy precise to the second
elif precision == "millisecond":
ts = ts + '.' + ms[:3]
elif zoned.microsecond > 0:
ts = ts + '.' + ms.rstrip("0")
return ts + "Z"
def parse_into_datetime(value, precision=None):
"""Parse a value into a valid STIX timestamp object.
"""
if isinstance(value, dt.date):
if hasattr(value, 'hour'):
ts = value
else:
# Add a time component
ts = dt.datetime.combine(value, dt.time(0, 0, tzinfo=pytz.utc))
else:
# value isn't a date or datetime object so assume it's a string
try:
parsed = parser.parse(value)
except (TypeError, ValueError):
# Unknown format
raise ValueError("must be a datetime object, date object, or "
"timestamp string in a recognizable format.")
if parsed.tzinfo:
ts = parsed.astimezone(pytz.utc)
else:
# Doesn't have timezone info in the string; assume UTC
ts = pytz.utc.localize(parsed)
# Ensure correct precision
if not precision:
return STIXdatetime(ts, precision=precision)
ms = ts.microsecond
if precision == 'second':
ts = ts.replace(microsecond=0)
elif precision == 'millisecond':
ms_len = len(str(ms))
if ms_len > 3:
# Truncate to millisecond precision
factor = 10 ** (ms_len - 3)
ts = ts.replace(microsecond=(ts.microsecond // factor) * factor)
else:
ts = ts.replace(microsecond=0)
return STIXdatetime(ts, precision=precision)
def get_dict(data):
"""Return data as a dictionary.
Input can be a dictionary, string, or file-like object.
"""
if type(data) is dict:
return data
else:
try:
return json.loads(data)
except TypeError:
pass
try:
return json.load(data)
except AttributeError:
pass
try:
return dict(data)
except (ValueError, TypeError):
raise ValueError("Cannot convert '%s' to dictionary." % str(data))
2017-08-15 19:40:47 +02:00
def find_property_index(obj, properties, tuple_to_find):
"""Recursively find the property in the object model, return the index
according to the _properties OrderedDict. If it's a list look for
2017-08-15 19:40:47 +02:00
individual objects.
"""
from .base import _STIXBase
2017-08-15 19:40:47 +02:00
try:
if tuple_to_find[1] in obj._inner.values():
return properties.index(tuple_to_find[0])
raise ValueError
except ValueError:
for pv in obj._inner.values():
if isinstance(pv, list):
for item in pv:
2017-08-15 19:54:16 +02:00
if isinstance(item, _STIXBase):
2017-08-15 19:40:47 +02:00
val = find_property_index(item,
2017-08-29 21:08:26 +02:00
item.object_properties(),
2017-08-15 19:40:47 +02:00
tuple_to_find)
if val is not None:
return val
elif isinstance(pv, dict):
if pv.get(tuple_to_find[0]) is not None:
try:
return int(tuple_to_find[0])
except ValueError:
return len(tuple_to_find[0])
for item in pv.values():
2017-08-15 19:54:16 +02:00
if isinstance(item, _STIXBase):
2017-08-15 19:40:47 +02:00
val = find_property_index(item,
2017-08-29 21:08:26 +02:00
item.object_properties(),
2017-08-15 19:40:47 +02:00
tuple_to_find)
if val is not None:
return val
2017-08-31 22:36:59 +02:00
def new_version(data, **kwargs):
"""Create a new version of a STIX object, by modifying properties and
updating the ``modified`` property.
"""
if not isinstance(data, Mapping):
raise ValueError('cannot create new version of object of this type! '
'Try a dictionary or instance of an SDO or SRO class.')
unchangable_properties = []
if data.get("revoked"):
raise RevokeError("new_version")
try:
new_obj_inner = copy.deepcopy(data._inner)
except AttributeError:
new_obj_inner = copy.deepcopy(data)
properties_to_change = kwargs.keys()
# Make sure certain properties aren't trying to change
for prop in ["created", "created_by_ref", "id", "type"]:
if prop in properties_to_change:
unchangable_properties.append(prop)
if unchangable_properties:
raise UnmodifiablePropertyError(unchangable_properties)
cls = type(data)
if 'modified' not in kwargs:
kwargs['modified'] = get_timestamp()
elif 'modified' in data:
old_modified_property = parse_into_datetime(data.get('modified'), precision='millisecond')
new_modified_property = parse_into_datetime(kwargs['modified'], precision='millisecond')
if new_modified_property < old_modified_property:
raise InvalidValueError(cls, 'modified', "The new modified datetime cannot be before the current modified datatime.")
new_obj_inner.update(kwargs)
# Exclude properties with a value of 'None' in case data is not an instance of a _STIXBase subclass
return cls(**{k: v for k, v in new_obj_inner.items() if v is not None})
def revoke(data):
"""Revoke a STIX object.
Returns:
A new version of the object with ``revoked`` set to ``True``.
"""
if not isinstance(data, Mapping):
raise ValueError('cannot revoke object of this type! Try a dictionary '
'or instance of an SDO or SRO class.')
if data.get("revoked"):
raise RevokeError("revoke")
return new_version(data, revoked=True)