diff --git a/stix2/__init__.py b/stix2/__init__.py index 6ef8328..3991a64 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -1,274 +1,6 @@ -import collections -import datetime -import json -import uuid +"""Python APIs for STIX 2.""" -import pytz - - -# Sentinel value for fields that should be set to the current time. -# We can't use the standard 'default' approach, since if there are multiple -# timestamps in a single object, the timestamps will vary by a few microseconds. -NOW = object() - -DEFAULT_ERROR = "{type} must have {field}='{expected}'." -COMMON_PROPERTIES = { - 'type': { - 'default': (lambda x: x._type), - 'validate': (lambda x, val: val == x._type) - }, - 'id': { - 'default': (lambda x: x._make_id()), - 'validate': (lambda x, val: val.startswith(x._type + "--")), - 'expected': (lambda x: x._type + "--"), - 'error_msg': "{type} {field} values must begin with '{expected}'." - }, - 'created': { - 'default': NOW, - }, - 'modified': { - 'default': NOW, - }, -} - - -def format_datetime(dt): - # TODO: how to handle naive datetime - - # 1. Convert to UTC - # 2. Format in isoformat - # 3. Strip off "+00:00" - # 4. Add "Z" - - # TODO: how to handle timestamps with subsecond 0's - return dt.astimezone(pytz.utc).isoformat()[:-6] + "Z" - - -class STIXJSONEncoder(json.JSONEncoder): - - def default(self, obj): - if isinstance(obj, (datetime.date, datetime.datetime)): - return format_datetime(obj) - elif isinstance(obj, _STIXBase): - return dict(obj) - else: - return super(STIXJSONEncoder, self).default(obj) - - -class _STIXBase(collections.Mapping): - """Base class for STIX object types""" - - @classmethod - def _make_id(cls): - return cls._type + "--" + str(uuid.uuid4()) - - def __init__(self, **kwargs): - cls = self.__class__ - class_name = cls.__name__ - - # Use the same timestamp for any auto-generated datetimes - now = datetime.datetime.now(tz=pytz.UTC) - - # Detect any keyword arguments not allowed for a specific type - extra_kwargs = list(set(kwargs) - set(cls._properties)) - if extra_kwargs: - raise TypeError("unexpected keyword arguments: " + str(extra_kwargs)) - - required_fields = [k for k, v in cls._properties.items() if v.get('required')] - missing_kwargs = set(required_fields) - set(kwargs) - if missing_kwargs: - msg = "Missing required field(s) for {type}: ({fields})." - field_list = ", ".join(x for x in sorted(list(missing_kwargs))) - raise ValueError(msg.format(type=class_name, fields=field_list)) - - for prop_name, prop_metadata in cls._properties.items(): - if prop_name not in kwargs: - if prop_metadata.get('default'): - default = prop_metadata['default'] - if default == NOW: - kwargs[prop_name] = now - else: - kwargs[prop_name] = default(cls) - elif prop_metadata.get('fixed'): - kwargs[prop_name] = prop_metadata['fixed'] - - if prop_metadata.get('validate'): - if not prop_metadata['validate'](cls, kwargs[prop_name]): - msg = prop_metadata.get('error_msg', DEFAULT_ERROR).format( - type=class_name, - field=prop_name, - expected=prop_metadata.get('expected', - prop_metadata['default'])(cls), - ) - raise ValueError(msg) - elif prop_metadata.get('fixed'): - if kwargs[prop_name] != prop_metadata['fixed']: - msg = prop_metadata.get('error_msg', DEFAULT_ERROR).format( - type=class_name, - field=prop_name, - expected=prop_metadata['fixed'] - ) - raise ValueError(msg) - - self._inner = kwargs - - def __getitem__(self, key): - return self._inner[key] - - def __iter__(self): - return iter(self._inner) - - def __len__(self): - return len(self._inner) - - # Handle attribute access just like key access - def __getattr__(self, name): - return self.get(name) - - def __setattr__(self, name, value): - if name != '_inner': - raise ValueError("Cannot modify properties after creation.") - super(_STIXBase, self).__setattr__(name, value) - - def __str__(self): - # TODO: put keys in specific order. Probably need custom JSON encoder. - return json.dumps(self, indent=4, sort_keys=True, cls=STIXJSONEncoder, - separators=(",", ": ")) # Don't include spaces after commas. - - -class ExternalReference(_STIXBase): - _properties = { - 'source_name': { - 'required': True, - }, - 'description': {}, - 'url': {}, - 'external_id': {}, - } - - -class Bundle(_STIXBase): - - _type = 'bundle' - _properties = { - # Borrow the 'type' and 'id' definitions - 'type': COMMON_PROPERTIES['type'], - 'id': COMMON_PROPERTIES['id'], - 'spec_version': { - 'fixed': "2.0", - }, - 'objects': {}, - } - - def __init__(self, *args, **kwargs): - # Add any positional arguments to the 'objects' kwarg. - if args: - kwargs['objects'] = kwargs.get('objects', []) + list(args) - - super(Bundle, self).__init__(**kwargs) - - -class Indicator(_STIXBase): - - _type = 'indicator' - _properties = COMMON_PROPERTIES.copy() - _properties.update({ - 'labels': { - 'required': True, - }, - 'pattern': { - 'required': True, - }, - 'valid_from': { - 'default': NOW, - }, - }) - - def __init__(self, **kwargs): - # TODO: - # - created_by_ref - # - revoked - # - external_references - # - object_marking_refs - # - granular_markings - - # - name - # - description - # - valid_until - # - kill_chain_phases - - super(Indicator, self).__init__(**kwargs) - - -class Malware(_STIXBase): - - _type = 'malware' - _properties = COMMON_PROPERTIES.copy() - _properties.update({ - 'labels': { - 'required': True, - }, - 'name': { - 'required': True, - }, - }) - - def __init__(self, **kwargs): - # TODO: - # - created_by_ref - # - revoked - # - external_references - # - object_marking_refs - # - granular_markings - - # - description - # - kill_chain_phases - - super(Malware, self).__init__(**kwargs) - - -class Relationship(_STIXBase): - - _type = 'relationship' - _properties = COMMON_PROPERTIES.copy() - _properties.update({ - 'relationship_type': { - 'required': True, - }, - 'source_ref': { - 'required': True, - }, - 'target_ref': { - 'required': True, - }, - }) - - # Explicitly define the first three kwargs to make readable Relationship declarations. - def __init__(self, source_ref=None, relationship_type=None, target_ref=None, - **kwargs): - # TODO: - # - created_by_ref - # - revoked - # - external_references - # - object_marking_refs - # - granular_markings - - # - description - - # Allow (source_ref, relationship_type, target_ref) as positional args. - if source_ref and not kwargs.get('source_ref'): - kwargs['source_ref'] = source_ref - if relationship_type and not kwargs.get('relationship_type'): - kwargs['relationship_type'] = relationship_type - if target_ref and not kwargs.get('target_ref'): - kwargs['target_ref'] = target_ref - - # If actual STIX objects (vs. just the IDs) are passed in, extract the - # ID values to use in the Relationship object. - if kwargs.get('source_ref') and isinstance(kwargs['source_ref'], _STIXBase): - kwargs['source_ref'] = kwargs['source_ref'].id - - if kwargs.get('target_ref') and isinstance(kwargs['target_ref'], _STIXBase): - kwargs['target_ref'] = kwargs['target_ref'].id - - super(Relationship, self).__init__(**kwargs) +from .bundle import Bundle +from .common import ExternalReference +from .sdo import Indicator, Malware +from .sro import Relationship diff --git a/stix2/base.py b/stix2/base.py new file mode 100644 index 0000000..317372c --- /dev/null +++ b/stix2/base.py @@ -0,0 +1,104 @@ +"""Base class for type definitions in the stix2 library.""" + +import collections +import datetime as dt +import json +import uuid + +from .utils import format_datetime, get_timestamp, NOW + +__all__ = ['STIXJSONEncoder', '_STIXBase'] + +DEFAULT_ERROR = "{type} must have {field}='{expected}'." + + +class STIXJSONEncoder(json.JSONEncoder): + + def default(self, obj): + if isinstance(obj, (dt.date, dt.datetime)): + return format_datetime(obj) + elif isinstance(obj, _STIXBase): + return dict(obj) + else: + return super(STIXJSONEncoder, self).default(obj) + + +class _STIXBase(collections.Mapping): + """Base class for STIX object types""" + + @classmethod + def _make_id(cls): + return cls._type + "--" + str(uuid.uuid4()) + + def __init__(self, **kwargs): + cls = self.__class__ + class_name = cls.__name__ + + # Use the same timestamp for any auto-generated datetimes + now = get_timestamp() + + # Detect any keyword arguments not allowed for a specific type + extra_kwargs = list(set(kwargs) - set(cls._properties)) + if extra_kwargs: + raise TypeError("unexpected keyword arguments: " + str(extra_kwargs)) + + required_fields = [k for k, v in cls._properties.items() if v.get('required')] + missing_kwargs = set(required_fields) - set(kwargs) + if missing_kwargs: + msg = "Missing required field(s) for {type}: ({fields})." + field_list = ", ".join(x for x in sorted(list(missing_kwargs))) + raise ValueError(msg.format(type=class_name, fields=field_list)) + + for prop_name, prop_metadata in cls._properties.items(): + if prop_name not in kwargs: + if prop_metadata.get('default'): + default = prop_metadata['default'] + if default == NOW: + kwargs[prop_name] = now + else: + kwargs[prop_name] = default(cls) + elif prop_metadata.get('fixed'): + kwargs[prop_name] = prop_metadata['fixed'] + + if prop_metadata.get('validate'): + if not prop_metadata['validate'](cls, kwargs[prop_name]): + msg = prop_metadata.get('error_msg', DEFAULT_ERROR).format( + type=class_name, + field=prop_name, + expected=prop_metadata.get('expected', + prop_metadata['default'])(cls), + ) + raise ValueError(msg) + elif prop_metadata.get('fixed'): + if kwargs[prop_name] != prop_metadata['fixed']: + msg = prop_metadata.get('error_msg', DEFAULT_ERROR).format( + type=class_name, + field=prop_name, + expected=prop_metadata['fixed'] + ) + raise ValueError(msg) + + self._inner = kwargs + + def __getitem__(self, key): + return self._inner[key] + + def __iter__(self): + return iter(self._inner) + + def __len__(self): + return len(self._inner) + + # Handle attribute access just like key access + def __getattr__(self, name): + return self.get(name) + + def __setattr__(self, name, value): + if name != '_inner': + raise ValueError("Cannot modify properties after creation.") + super(_STIXBase, self).__setattr__(name, value) + + def __str__(self): + # TODO: put keys in specific order. Probably need custom JSON encoder. + return json.dumps(self, indent=4, sort_keys=True, cls=STIXJSONEncoder, + separators=(",", ": ")) # Don't include spaces after commas. diff --git a/stix2/bundle.py b/stix2/bundle.py new file mode 100644 index 0000000..614ce0d --- /dev/null +++ b/stix2/bundle.py @@ -0,0 +1,24 @@ +"""STIX 2 Bundle object""" + +from .base import _STIXBase +from .common import TYPE_PROPERTY, ID_PROPERTY + + +class Bundle(_STIXBase): + + _type = 'bundle' + _properties = { + 'type': TYPE_PROPERTY, + 'id': ID_PROPERTY, + 'spec_version': { + 'fixed': "2.0", + }, + 'objects': {}, + } + + def __init__(self, *args, **kwargs): + # Add any positional arguments to the 'objects' kwarg. + if args: + kwargs['objects'] = kwargs.get('objects', []) + list(args) + + super(Bundle, self).__init__(**kwargs) diff --git a/stix2/common.py b/stix2/common.py new file mode 100644 index 0000000..924210a --- /dev/null +++ b/stix2/common.py @@ -0,0 +1,38 @@ +"""STIX 2 Common Data Types and Properties""" + +from .base import _STIXBase +from .utils import NOW + +TYPE_PROPERTY = { + 'default': (lambda x: x._type), + 'validate': (lambda x, val: val == x._type) +} + +ID_PROPERTY = { + 'default': (lambda x: x._make_id()), + 'validate': (lambda x, val: val.startswith(x._type + "--")), + 'expected': (lambda x: x._type + "--"), + 'error_msg': "{type} {field} values must begin with '{expected}'." +} + +COMMON_PROPERTIES = { + 'type': TYPE_PROPERTY, + 'id': ID_PROPERTY, + 'created': { + 'default': NOW, + }, + 'modified': { + 'default': NOW, + }, +} + + +class ExternalReference(_STIXBase): + _properties = { + 'source_name': { + 'required': True, + }, + 'description': {}, + 'url': {}, + 'external_id': {}, + } diff --git a/stix2/sdo.py b/stix2/sdo.py new file mode 100644 index 0000000..3a033fe --- /dev/null +++ b/stix2/sdo.py @@ -0,0 +1,64 @@ +"""STIX 2.0 Domain Objects""" + +from .base import _STIXBase +from .common import COMMON_PROPERTIES +from .utils import NOW + + +class Indicator(_STIXBase): + + _type = 'indicator' + _properties = COMMON_PROPERTIES.copy() + _properties.update({ + 'labels': { + 'required': True, + }, + 'pattern': { + 'required': True, + }, + 'valid_from': { + 'default': NOW, + }, + }) + + def __init__(self, **kwargs): + # TODO: + # - created_by_ref + # - revoked + # - external_references + # - object_marking_refs + # - granular_markings + + # - name + # - description + # - valid_until + # - kill_chain_phases + + super(Indicator, self).__init__(**kwargs) + + +class Malware(_STIXBase): + + _type = 'malware' + _properties = COMMON_PROPERTIES.copy() + _properties.update({ + 'labels': { + 'required': True, + }, + 'name': { + 'required': True, + }, + }) + + def __init__(self, **kwargs): + # TODO: + # - created_by_ref + # - revoked + # - external_references + # - object_marking_refs + # - granular_markings + + # - description + # - kill_chain_phases + + super(Malware, self).__init__(**kwargs) diff --git a/stix2/sro.py b/stix2/sro.py new file mode 100644 index 0000000..856cf3b --- /dev/null +++ b/stix2/sro.py @@ -0,0 +1,51 @@ +"""STIX 2.0 Relationship Objects.""" + +from .base import _STIXBase +from .common import COMMON_PROPERTIES + + +class Relationship(_STIXBase): + + _type = 'relationship' + _properties = COMMON_PROPERTIES.copy() + _properties.update({ + 'relationship_type': { + 'required': True, + }, + 'source_ref': { + 'required': True, + }, + 'target_ref': { + 'required': True, + }, + }) + + # Explicitly define the first three kwargs to make readable Relationship declarations. + def __init__(self, source_ref=None, relationship_type=None, target_ref=None, + **kwargs): + # TODO: + # - created_by_ref + # - revoked + # - external_references + # - object_marking_refs + # - granular_markings + + # - description + + # Allow (source_ref, relationship_type, target_ref) as positional args. + if source_ref and not kwargs.get('source_ref'): + kwargs['source_ref'] = source_ref + if relationship_type and not kwargs.get('relationship_type'): + kwargs['relationship_type'] = relationship_type + if target_ref and not kwargs.get('target_ref'): + kwargs['target_ref'] = target_ref + + # If actual STIX objects (vs. just the IDs) are passed in, extract the + # ID values to use in the Relationship object. + if kwargs.get('source_ref') and isinstance(kwargs['source_ref'], _STIXBase): + kwargs['source_ref'] = kwargs['source_ref'].id + + if kwargs.get('target_ref') and isinstance(kwargs['target_ref'], _STIXBase): + kwargs['target_ref'] = kwargs['target_ref'].id + + super(Relationship, self).__init__(**kwargs) diff --git a/stix2/test/test_stix2.py b/stix2/test/test_stix2.py index e915284..ec05a34 100644 --- a/stix2/test/test_stix2.py +++ b/stix2/test/test_stix2.py @@ -1,32 +1,32 @@ """Tests for the stix2 library""" -import datetime +import datetime as dt import uuid import pytest import pytz import stix2 +import stix2.utils -amsterdam = pytz.timezone('Europe/Amsterdam') -eastern = pytz.timezone('US/Eastern') -FAKE_TIME = datetime.datetime(2017, 1, 1, 12, 34, 56, tzinfo=pytz.utc) + +FAKE_TIME = dt.datetime(2017, 1, 1, 12, 34, 56, tzinfo=pytz.utc) # Inspired by: http://stackoverflow.com/a/24006251 @pytest.fixture def clock(monkeypatch): - class mydatetime(datetime.datetime): + class mydatetime(dt.datetime): @classmethod def now(cls, tz=None): return FAKE_TIME - monkeypatch.setattr(datetime, 'datetime', mydatetime) + monkeypatch.setattr(dt, 'datetime', mydatetime) def test_clock(clock): - assert datetime.datetime.now() == FAKE_TIME + assert dt.datetime.now() == FAKE_TIME @pytest.fixture @@ -51,16 +51,6 @@ def test_my_uuid4_fixture(uuid4): assert uuid.uuid4() == "00000000-0000-0000-0000-000000000104" -@pytest.mark.parametrize('dt,timestamp', [ - (datetime.datetime(2017, 1, 1, tzinfo=pytz.utc), '2017-01-01T00:00:00Z'), - (amsterdam.localize(datetime.datetime(2017, 1, 1)), '2016-12-31T23:00:00Z'), - (eastern.localize(datetime.datetime(2017, 1, 1, 12, 34, 56)), '2017-01-01T17:34:56Z'), - (eastern.localize(datetime.datetime(2017, 7, 1)), '2017-07-01T04:00:00Z'), -]) -def test_timestamp_formatting(dt, timestamp): - assert stix2.format_datetime(dt) == timestamp - - INDICATOR_ID = "indicator--01234567-89ab-cdef-0123-456789abcdef" MALWARE_ID = "malware--fedcba98-7654-3210-fedc-ba9876543210" RELATIONSHIP_ID = "relationship--00000000-1111-2222-3333-444444444444" @@ -114,8 +104,8 @@ EXPECTED_INDICATOR = """{ def test_indicator_with_all_required_fields(): - now = datetime.datetime(2017, 1, 1, 0, 0, 0, tzinfo=pytz.utc) - epoch = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc) + now = dt.datetime(2017, 1, 1, 0, 0, 0, tzinfo=pytz.utc) + epoch = dt.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc) indicator = stix2.Indicator( type="indicator", @@ -176,7 +166,7 @@ def test_indicator_required_field_pattern(): def test_cannot_assign_to_indicator_attributes(indicator): with pytest.raises(ValueError) as excinfo: - indicator.valid_from = datetime.datetime.now() + indicator.valid_from = dt.datetime.now() assert str(excinfo.value) == "Cannot modify properties after creation." @@ -207,7 +197,7 @@ EXPECTED_MALWARE = """{ def test_malware_with_all_required_fields(): - now = datetime.datetime(2016, 5, 12, 8, 17, 27, tzinfo=pytz.utc) + now = dt.datetime(2016, 5, 12, 8, 17, 27, tzinfo=pytz.utc) malware = stix2.Malware( type="malware", @@ -288,7 +278,7 @@ EXPECTED_RELATIONSHIP = """{ def test_relationship_all_required_fields(): - now = datetime.datetime(2016, 4, 6, 20, 6, 37, tzinfo=pytz.utc) + now = dt.datetime(2016, 4, 6, 20, 6, 37, tzinfo=pytz.utc) relationship = stix2.Relationship( type='relationship', diff --git a/stix2/test/test_utils.py b/stix2/test/test_utils.py new file mode 100644 index 0000000..3eee491 --- /dev/null +++ b/stix2/test/test_utils.py @@ -0,0 +1,19 @@ +import datetime as dt + +import pytest +import pytz + +import stix2.utils + +amsterdam = pytz.timezone('Europe/Amsterdam') +eastern = pytz.timezone('US/Eastern') + + +@pytest.mark.parametrize('dttm,timestamp', [ + (dt.datetime(2017, 1, 1, tzinfo=pytz.utc), '2017-01-01T00:00:00Z'), + (amsterdam.localize(dt.datetime(2017, 1, 1)), '2016-12-31T23:00:00Z'), + (eastern.localize(dt.datetime(2017, 1, 1, 12, 34, 56)), '2017-01-01T17:34:56Z'), + (eastern.localize(dt.datetime(2017, 7, 1)), '2017-07-01T04:00:00Z'), +]) +def test_timestamp_formatting(dttm, timestamp): + assert stix2.utils.format_datetime(dttm) == timestamp diff --git a/stix2/utils.py b/stix2/utils.py new file mode 100644 index 0000000..0acd5de --- /dev/null +++ b/stix2/utils.py @@ -0,0 +1,26 @@ +"""Utility functions and classes for the stix2 library.""" + +import datetime as dt + +import pytz + +# Sentinel value for fields that should be set to the current time. +# We can't use the standard 'default' approach, since if there are multiple +# timestamps in a single object, the timestamps will vary by a few microseconds. +NOW = object() + + +def get_timestamp(): + return dt.datetime.now(tz=pytz.UTC) + + +def format_datetime(dttm): + # TODO: how to handle naive datetime + + # 1. Convert to UTC + # 2. Format in ISO format + # 3. Strip off "+00:00" + # 4. Add "Z" + + # TODO: how to handle timestamps with subsecond 0's + return dttm.astimezone(pytz.utc).isoformat()[:-6] + "Z"