"""Utility functions and classes for the STIX2 library.""" try: from collections.abc import Mapping except ImportError: from collections import Mapping import copy import datetime as dt import json import re from dateutil import parser import pytz import stix2.base from .exceptions import ( InvalidValueError, RevokeError, UnmodifiablePropertyError, ) # Sentinel value for properties that should be set to the current time. # We can't use the standard 'default' approach, since if there are multiple # timestamps in a single object, the timestamps will vary by a few microseconds. NOW = object() # STIX object properties that cannot be modified STIX_UNMOD_PROPERTIES = ['created', 'created_by_ref', 'id', 'type'] TYPE_REGEX = re.compile(r'^\-?[a-z0-9]+(-[a-z0-9]+)*\-?$') SCO21_TYPE_REGEX = re.compile(r'^\-?([a-z][a-z0-9]*)+(-[a-z0-9]+)*\-?$') SCO21_EXT_REGEX = re.compile(r'^\-?([a-z][a-z0-9]*)+(-[a-z0-9]+)*\-ext$') PREFIX_21_REGEX = re.compile(r'^[a-z].*') class STIXdatetime(dt.datetime): def __new__(cls, *args, **kwargs): precision = kwargs.pop('precision', None) if isinstance(args[0], dt.datetime): # Allow passing in a datetime object dttm = args[0] args = ( dttm.year, dttm.month, dttm.day, dttm.hour, dttm.minute, dttm.second, dttm.microsecond, dttm.tzinfo, ) # self will be an instance of STIXdatetime, not dt.datetime self = dt.datetime.__new__(cls, *args, **kwargs) self.precision = precision return self def __repr__(self): return "'%s'" % format_datetime(self) def deduplicate(stix_obj_list): """Deduplicate a list of STIX objects to a unique set. Reduces a set of STIX objects to unique set by looking at 'id' and 'modified' fields - as a unique object version is determined by the combination of those fields Note: Be aware, as can be seen in the implementation of deduplicate(),that if the "stix_obj_list" argument has multiple STIX objects of the same version, the last object version found in the list will be the one that is returned. Args: stix_obj_list (list): list of STIX objects (dicts) Returns: A list with a unique set of the passed list of STIX objects. """ unique_objs = {} for obj in stix_obj_list: try: unique_objs[(obj['id'], obj['modified'])] = obj except KeyError: # Handle objects with no `modified` property, e.g. marking-definition unique_objs[(obj['id'], obj['created'])] = obj return list(unique_objs.values()) def get_timestamp(): """Return a STIX timestamp of the current date and time.""" return STIXdatetime.now(tz=pytz.UTC) def format_datetime(dttm): """Convert a datetime object into a valid STIX timestamp string. 1. Convert to timezone-aware 2. Convert to UTC 3. Format in ISO format 4. Ensure correct precision a. Add subsecond value if non-zero and precision not defined 5. Add "Z" """ if dttm.tzinfo is None or dttm.tzinfo.utcoffset(dttm) is None: # dttm is timezone-naive; assume UTC zoned = pytz.utc.localize(dttm) else: zoned = dttm.astimezone(pytz.utc) ts = zoned.strftime('%Y-%m-%dT%H:%M:%S') ms = zoned.strftime('%f') precision = getattr(dttm, 'precision', None) if precision == 'second': pass # Already precise to the second elif precision == 'millisecond': ts = ts + '.' + ms[:3] elif zoned.microsecond > 0: ts = ts + '.' + ms.rstrip('0') return ts + 'Z' def parse_into_datetime(value, precision=None): """Parse a value into a valid STIX timestamp object. """ if isinstance(value, dt.date): if hasattr(value, 'hour'): ts = value else: # Add a time component ts = dt.datetime.combine(value, dt.time(0, 0, tzinfo=pytz.utc)) else: # value isn't a date or datetime object so assume it's a string try: parsed = parser.parse(value) except (TypeError, ValueError): # Unknown format raise ValueError( "must be a datetime object, date object, or " "timestamp string in a recognizable format.", ) if parsed.tzinfo: ts = parsed.astimezone(pytz.utc) else: # Doesn't have timezone info in the string; assume UTC ts = pytz.utc.localize(parsed) # Ensure correct precision if not precision: return STIXdatetime(ts, precision=precision) ms = ts.microsecond if precision == 'second': ts = ts.replace(microsecond=0) elif precision == 'millisecond': ms_len = len(str(ms)) if ms_len > 3: # Truncate to millisecond precision factor = 10 ** (ms_len - 3) ts = ts.replace(microsecond=(ts.microsecond // factor) * factor) else: ts = ts.replace(microsecond=0) return STIXdatetime(ts, precision=precision) def _get_dict(data): """Return data as a dictionary. Input can be a dictionary, string, or file-like object. """ if type(data) is dict: return data else: try: return json.loads(data) except TypeError: pass try: return json.load(data) except AttributeError: pass try: return dict(data) except (ValueError, TypeError): raise ValueError("Cannot convert '%s' to dictionary." % str(data)) def _find(seq, val): """ Search sequence 'seq' for val. This behaves like str.find(): if not found, -1 is returned instead of throwing an exception. Args: seq: The sequence to search val: The value to search for Returns: int: The index of the value if found, or -1 if not found """ try: return seq.index(val) except ValueError: return -1 def _find_property_in_seq(seq, search_key, search_value): """ Helper for find_property_index(): search for the property in all elements of the given sequence. Args: seq: The sequence search_key: Property name to find search_value: Property value to find Returns: int: A property index, or -1 if the property was not found """ idx = -1 for elem in seq: idx = find_property_index(elem, search_key, search_value) if idx >= 0: break return idx def find_property_index(obj, search_key, search_value): """ Search (recursively) for the given key and value in the given object. Return an index for the key, relative to whatever object it's found in. Args: obj: The object to search (list, dict, or stix object) search_key: A search key search_value: A search value Returns: int: An index; -1 if the key and value aren't found """ from .base import _STIXBase # Special-case keys which are numbers-as-strings, e.g. for cyber-observable # mappings. Use the int value of the key as the index. if search_key.isdigit(): return int(search_key) if isinstance(obj, _STIXBase): if search_key in obj and obj[search_key] == search_value: idx = _find(obj.object_properties(), search_key) else: idx = _find_property_in_seq(obj.values(), search_key, search_value) elif isinstance(obj, dict): if search_key in obj and obj[search_key] == search_value: idx = _find(sorted(obj), search_key) else: idx = _find_property_in_seq(obj.values(), search_key, search_value) elif isinstance(obj, list): idx = _find_property_in_seq(obj, search_key, search_value) else: # Don't know how to search this type idx = -1 return idx def new_version(data, **kwargs): """Create a new version of a STIX object, by modifying properties and updating the ``modified`` property. """ if not isinstance(data, Mapping): raise ValueError( "cannot create new version of object of this type! " "Try a dictionary or instance of an SDO or SRO class.", ) unchangable_properties = [] if data.get('revoked'): raise RevokeError("new_version") try: new_obj_inner = copy.deepcopy(data._inner) except AttributeError: new_obj_inner = copy.deepcopy(data) properties_to_change = kwargs.keys() # Make sure certain properties aren't trying to change for prop in STIX_UNMOD_PROPERTIES: if prop in properties_to_change: unchangable_properties.append(prop) if unchangable_properties: raise UnmodifiablePropertyError(unchangable_properties) cls = type(data) if 'modified' not in kwargs: kwargs['modified'] = get_timestamp() elif 'modified' in data: old_modified_property = parse_into_datetime(data.get('modified'), precision='millisecond') new_modified_property = parse_into_datetime(kwargs['modified'], precision='millisecond') if new_modified_property <= old_modified_property: raise InvalidValueError( cls, 'modified', "The new modified datetime cannot be before than or equal to the current modified datetime." "It cannot be equal, as according to STIX 2 specification, objects that are different " "but have the same id and modified timestamp do not have defined consumer behavior.", ) new_obj_inner.update(kwargs) # Exclude properties with a value of 'None' in case data is not an instance of a _STIXBase subclass return cls(**{k: v for k, v in new_obj_inner.items() if v is not None}) def revoke(data): """Revoke a STIX object. Returns: A new version of the object with ``revoked`` set to ``True``. """ if not isinstance(data, Mapping): raise ValueError( "cannot revoke object of this type! Try a dictionary " "or instance of an SDO or SRO class.", ) if data.get('revoked'): raise RevokeError("revoke") return new_version(data, revoked=True, allow_custom=True) def get_class_hierarchy_names(obj): """Given an object, return the names of the class hierarchy.""" names = [] for cls in obj.__class__.__mro__: names.append(cls.__name__) return names def remove_custom_stix(stix_obj): """Remove any custom STIX objects or properties. Warnings: This function is a best effort utility, in that it will remove custom objects and properties based on the type names; i.e. if "x-" prefixes object types, and "x\\_" prefixes property types. According to the STIX2 spec, those naming conventions are a SHOULDs not MUSTs, meaning that valid custom STIX content may ignore those conventions and in effect render this utility function invalid when used on that STIX content. Args: stix_obj (dict OR python-stix obj): a single python-stix object or dict of a STIX object Returns: A new version of the object with any custom content removed """ if stix_obj['type'].startswith('x-'): # if entire object is custom, discard return None custom_props = [] for prop in stix_obj.items(): if prop[0].startswith('x_'): # for every custom property, record it and set value to None # (so we can pass it to new_version() and it will be dropped) custom_props.append((prop[0], None)) if custom_props: # obtain set of object properties that can be transferred # to a new object version. This is 1)custom props with their # values set to None, and 2)any properties left that are not # unmodifiable STIX properties or the "modified" property # set of properties that are not supplied to new_version() # to be used for updating properties. This includes unmodifiable # properties (properties that new_version() just re-uses from the # existing STIX object) and the "modified" property. We dont supply the # "modified" property so that new_version() creates a new datetime # value for this property non_supplied_props = STIX_UNMOD_PROPERTIES + ['modified'] props = [(prop, stix_obj[prop]) for prop in stix_obj if prop not in non_supplied_props] # add to set the custom properties we want to get rid of (with their value=None) props.extend(custom_props) new_obj = new_version(stix_obj, **(dict(props))) while parse_into_datetime(new_obj['modified']) == parse_into_datetime(stix_obj['modified']): # Prevents bug when fast computation allows multiple STIX object # versions to be created in single unit of time new_obj = new_version(stix_obj, **(dict(props))) return new_obj else: return stix_obj def get_type_from_id(stix_id): return stix_id.split('--', 1)[0] def is_marking(obj_or_id): """Determines whether the given object or object ID is/is for a marking definition. :param obj_or_id: A STIX object or object ID as a string. :return: True if a marking definition, False otherwise. """ if isinstance(obj_or_id, (stix2.base._STIXBase, dict)): result = obj_or_id["type"] == "marking-definition" else: # it's a string ID result = obj_or_id.startswith("marking-definition--") return result