Refactor library into separate files.

stix2.1
Greg Back 2017-02-10 15:35:02 -06:00
parent b171f025c8
commit 96e880b49b
9 changed files with 343 additions and 295 deletions

View File

@ -1,274 +1,6 @@
import collections """Python APIs for STIX 2."""
import datetime
import json
import uuid
import pytz from .bundle import Bundle
from .common import ExternalReference
from .sdo import Indicator, Malware
# Sentinel value for fields that should be set to the current time. from .sro import Relationship
# We can't use the standard 'default' approach, since if there are multiple
# timestamps in a single object, the timestamps will vary by a few microseconds.
NOW = object()
DEFAULT_ERROR = "{type} must have {field}='{expected}'."
COMMON_PROPERTIES = {
'type': {
'default': (lambda x: x._type),
'validate': (lambda x, val: val == x._type)
},
'id': {
'default': (lambda x: x._make_id()),
'validate': (lambda x, val: val.startswith(x._type + "--")),
'expected': (lambda x: x._type + "--"),
'error_msg': "{type} {field} values must begin with '{expected}'."
},
'created': {
'default': NOW,
},
'modified': {
'default': NOW,
},
}
def format_datetime(dt):
# TODO: how to handle naive datetime
# 1. Convert to UTC
# 2. Format in isoformat
# 3. Strip off "+00:00"
# 4. Add "Z"
# TODO: how to handle timestamps with subsecond 0's
return dt.astimezone(pytz.utc).isoformat()[:-6] + "Z"
class STIXJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime.date, datetime.datetime)):
return format_datetime(obj)
elif isinstance(obj, _STIXBase):
return dict(obj)
else:
return super(STIXJSONEncoder, self).default(obj)
class _STIXBase(collections.Mapping):
"""Base class for STIX object types"""
@classmethod
def _make_id(cls):
return cls._type + "--" + str(uuid.uuid4())
def __init__(self, **kwargs):
cls = self.__class__
class_name = cls.__name__
# Use the same timestamp for any auto-generated datetimes
now = datetime.datetime.now(tz=pytz.UTC)
# Detect any keyword arguments not allowed for a specific type
extra_kwargs = list(set(kwargs) - set(cls._properties))
if extra_kwargs:
raise TypeError("unexpected keyword arguments: " + str(extra_kwargs))
required_fields = [k for k, v in cls._properties.items() if v.get('required')]
missing_kwargs = set(required_fields) - set(kwargs)
if missing_kwargs:
msg = "Missing required field(s) for {type}: ({fields})."
field_list = ", ".join(x for x in sorted(list(missing_kwargs)))
raise ValueError(msg.format(type=class_name, fields=field_list))
for prop_name, prop_metadata in cls._properties.items():
if prop_name not in kwargs:
if prop_metadata.get('default'):
default = prop_metadata['default']
if default == NOW:
kwargs[prop_name] = now
else:
kwargs[prop_name] = default(cls)
elif prop_metadata.get('fixed'):
kwargs[prop_name] = prop_metadata['fixed']
if prop_metadata.get('validate'):
if not prop_metadata['validate'](cls, kwargs[prop_name]):
msg = prop_metadata.get('error_msg', DEFAULT_ERROR).format(
type=class_name,
field=prop_name,
expected=prop_metadata.get('expected',
prop_metadata['default'])(cls),
)
raise ValueError(msg)
elif prop_metadata.get('fixed'):
if kwargs[prop_name] != prop_metadata['fixed']:
msg = prop_metadata.get('error_msg', DEFAULT_ERROR).format(
type=class_name,
field=prop_name,
expected=prop_metadata['fixed']
)
raise ValueError(msg)
self._inner = kwargs
def __getitem__(self, key):
return self._inner[key]
def __iter__(self):
return iter(self._inner)
def __len__(self):
return len(self._inner)
# Handle attribute access just like key access
def __getattr__(self, name):
return self.get(name)
def __setattr__(self, name, value):
if name != '_inner':
raise ValueError("Cannot modify properties after creation.")
super(_STIXBase, self).__setattr__(name, value)
def __str__(self):
# TODO: put keys in specific order. Probably need custom JSON encoder.
return json.dumps(self, indent=4, sort_keys=True, cls=STIXJSONEncoder,
separators=(",", ": ")) # Don't include spaces after commas.
class ExternalReference(_STIXBase):
_properties = {
'source_name': {
'required': True,
},
'description': {},
'url': {},
'external_id': {},
}
class Bundle(_STIXBase):
_type = 'bundle'
_properties = {
# Borrow the 'type' and 'id' definitions
'type': COMMON_PROPERTIES['type'],
'id': COMMON_PROPERTIES['id'],
'spec_version': {
'fixed': "2.0",
},
'objects': {},
}
def __init__(self, *args, **kwargs):
# Add any positional arguments to the 'objects' kwarg.
if args:
kwargs['objects'] = kwargs.get('objects', []) + list(args)
super(Bundle, self).__init__(**kwargs)
class Indicator(_STIXBase):
_type = 'indicator'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'labels': {
'required': True,
},
'pattern': {
'required': True,
},
'valid_from': {
'default': NOW,
},
})
def __init__(self, **kwargs):
# TODO:
# - created_by_ref
# - revoked
# - external_references
# - object_marking_refs
# - granular_markings
# - name
# - description
# - valid_until
# - kill_chain_phases
super(Indicator, self).__init__(**kwargs)
class Malware(_STIXBase):
_type = 'malware'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'labels': {
'required': True,
},
'name': {
'required': True,
},
})
def __init__(self, **kwargs):
# TODO:
# - created_by_ref
# - revoked
# - external_references
# - object_marking_refs
# - granular_markings
# - description
# - kill_chain_phases
super(Malware, self).__init__(**kwargs)
class Relationship(_STIXBase):
_type = 'relationship'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'relationship_type': {
'required': True,
},
'source_ref': {
'required': True,
},
'target_ref': {
'required': True,
},
})
# Explicitly define the first three kwargs to make readable Relationship declarations.
def __init__(self, source_ref=None, relationship_type=None, target_ref=None,
**kwargs):
# TODO:
# - created_by_ref
# - revoked
# - external_references
# - object_marking_refs
# - granular_markings
# - description
# Allow (source_ref, relationship_type, target_ref) as positional args.
if source_ref and not kwargs.get('source_ref'):
kwargs['source_ref'] = source_ref
if relationship_type and not kwargs.get('relationship_type'):
kwargs['relationship_type'] = relationship_type
if target_ref and not kwargs.get('target_ref'):
kwargs['target_ref'] = target_ref
# If actual STIX objects (vs. just the IDs) are passed in, extract the
# ID values to use in the Relationship object.
if kwargs.get('source_ref') and isinstance(kwargs['source_ref'], _STIXBase):
kwargs['source_ref'] = kwargs['source_ref'].id
if kwargs.get('target_ref') and isinstance(kwargs['target_ref'], _STIXBase):
kwargs['target_ref'] = kwargs['target_ref'].id
super(Relationship, self).__init__(**kwargs)

104
stix2/base.py Normal file
View File

@ -0,0 +1,104 @@
"""Base class for type definitions in the stix2 library."""
import collections
import datetime as dt
import json
import uuid
from .utils import format_datetime, get_timestamp, NOW
__all__ = ['STIXJSONEncoder', '_STIXBase']
DEFAULT_ERROR = "{type} must have {field}='{expected}'."
class STIXJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (dt.date, dt.datetime)):
return format_datetime(obj)
elif isinstance(obj, _STIXBase):
return dict(obj)
else:
return super(STIXJSONEncoder, self).default(obj)
class _STIXBase(collections.Mapping):
"""Base class for STIX object types"""
@classmethod
def _make_id(cls):
return cls._type + "--" + str(uuid.uuid4())
def __init__(self, **kwargs):
cls = self.__class__
class_name = cls.__name__
# Use the same timestamp for any auto-generated datetimes
now = get_timestamp()
# Detect any keyword arguments not allowed for a specific type
extra_kwargs = list(set(kwargs) - set(cls._properties))
if extra_kwargs:
raise TypeError("unexpected keyword arguments: " + str(extra_kwargs))
required_fields = [k for k, v in cls._properties.items() if v.get('required')]
missing_kwargs = set(required_fields) - set(kwargs)
if missing_kwargs:
msg = "Missing required field(s) for {type}: ({fields})."
field_list = ", ".join(x for x in sorted(list(missing_kwargs)))
raise ValueError(msg.format(type=class_name, fields=field_list))
for prop_name, prop_metadata in cls._properties.items():
if prop_name not in kwargs:
if prop_metadata.get('default'):
default = prop_metadata['default']
if default == NOW:
kwargs[prop_name] = now
else:
kwargs[prop_name] = default(cls)
elif prop_metadata.get('fixed'):
kwargs[prop_name] = prop_metadata['fixed']
if prop_metadata.get('validate'):
if not prop_metadata['validate'](cls, kwargs[prop_name]):
msg = prop_metadata.get('error_msg', DEFAULT_ERROR).format(
type=class_name,
field=prop_name,
expected=prop_metadata.get('expected',
prop_metadata['default'])(cls),
)
raise ValueError(msg)
elif prop_metadata.get('fixed'):
if kwargs[prop_name] != prop_metadata['fixed']:
msg = prop_metadata.get('error_msg', DEFAULT_ERROR).format(
type=class_name,
field=prop_name,
expected=prop_metadata['fixed']
)
raise ValueError(msg)
self._inner = kwargs
def __getitem__(self, key):
return self._inner[key]
def __iter__(self):
return iter(self._inner)
def __len__(self):
return len(self._inner)
# Handle attribute access just like key access
def __getattr__(self, name):
return self.get(name)
def __setattr__(self, name, value):
if name != '_inner':
raise ValueError("Cannot modify properties after creation.")
super(_STIXBase, self).__setattr__(name, value)
def __str__(self):
# TODO: put keys in specific order. Probably need custom JSON encoder.
return json.dumps(self, indent=4, sort_keys=True, cls=STIXJSONEncoder,
separators=(",", ": ")) # Don't include spaces after commas.

24
stix2/bundle.py Normal file
View File

@ -0,0 +1,24 @@
"""STIX 2 Bundle object"""
from .base import _STIXBase
from .common import TYPE_PROPERTY, ID_PROPERTY
class Bundle(_STIXBase):
_type = 'bundle'
_properties = {
'type': TYPE_PROPERTY,
'id': ID_PROPERTY,
'spec_version': {
'fixed': "2.0",
},
'objects': {},
}
def __init__(self, *args, **kwargs):
# Add any positional arguments to the 'objects' kwarg.
if args:
kwargs['objects'] = kwargs.get('objects', []) + list(args)
super(Bundle, self).__init__(**kwargs)

38
stix2/common.py Normal file
View File

@ -0,0 +1,38 @@
"""STIX 2 Common Data Types and Properties"""
from .base import _STIXBase
from .utils import NOW
TYPE_PROPERTY = {
'default': (lambda x: x._type),
'validate': (lambda x, val: val == x._type)
}
ID_PROPERTY = {
'default': (lambda x: x._make_id()),
'validate': (lambda x, val: val.startswith(x._type + "--")),
'expected': (lambda x: x._type + "--"),
'error_msg': "{type} {field} values must begin with '{expected}'."
}
COMMON_PROPERTIES = {
'type': TYPE_PROPERTY,
'id': ID_PROPERTY,
'created': {
'default': NOW,
},
'modified': {
'default': NOW,
},
}
class ExternalReference(_STIXBase):
_properties = {
'source_name': {
'required': True,
},
'description': {},
'url': {},
'external_id': {},
}

64
stix2/sdo.py Normal file
View File

@ -0,0 +1,64 @@
"""STIX 2.0 Domain Objects"""
from .base import _STIXBase
from .common import COMMON_PROPERTIES
from .utils import NOW
class Indicator(_STIXBase):
_type = 'indicator'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'labels': {
'required': True,
},
'pattern': {
'required': True,
},
'valid_from': {
'default': NOW,
},
})
def __init__(self, **kwargs):
# TODO:
# - created_by_ref
# - revoked
# - external_references
# - object_marking_refs
# - granular_markings
# - name
# - description
# - valid_until
# - kill_chain_phases
super(Indicator, self).__init__(**kwargs)
class Malware(_STIXBase):
_type = 'malware'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'labels': {
'required': True,
},
'name': {
'required': True,
},
})
def __init__(self, **kwargs):
# TODO:
# - created_by_ref
# - revoked
# - external_references
# - object_marking_refs
# - granular_markings
# - description
# - kill_chain_phases
super(Malware, self).__init__(**kwargs)

51
stix2/sro.py Normal file
View File

@ -0,0 +1,51 @@
"""STIX 2.0 Relationship Objects."""
from .base import _STIXBase
from .common import COMMON_PROPERTIES
class Relationship(_STIXBase):
_type = 'relationship'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'relationship_type': {
'required': True,
},
'source_ref': {
'required': True,
},
'target_ref': {
'required': True,
},
})
# Explicitly define the first three kwargs to make readable Relationship declarations.
def __init__(self, source_ref=None, relationship_type=None, target_ref=None,
**kwargs):
# TODO:
# - created_by_ref
# - revoked
# - external_references
# - object_marking_refs
# - granular_markings
# - description
# Allow (source_ref, relationship_type, target_ref) as positional args.
if source_ref and not kwargs.get('source_ref'):
kwargs['source_ref'] = source_ref
if relationship_type and not kwargs.get('relationship_type'):
kwargs['relationship_type'] = relationship_type
if target_ref and not kwargs.get('target_ref'):
kwargs['target_ref'] = target_ref
# If actual STIX objects (vs. just the IDs) are passed in, extract the
# ID values to use in the Relationship object.
if kwargs.get('source_ref') and isinstance(kwargs['source_ref'], _STIXBase):
kwargs['source_ref'] = kwargs['source_ref'].id
if kwargs.get('target_ref') and isinstance(kwargs['target_ref'], _STIXBase):
kwargs['target_ref'] = kwargs['target_ref'].id
super(Relationship, self).__init__(**kwargs)

View File

@ -1,32 +1,32 @@
"""Tests for the stix2 library""" """Tests for the stix2 library"""
import datetime import datetime as dt
import uuid import uuid
import pytest import pytest
import pytz import pytz
import stix2 import stix2
import stix2.utils
amsterdam = pytz.timezone('Europe/Amsterdam')
eastern = pytz.timezone('US/Eastern') FAKE_TIME = dt.datetime(2017, 1, 1, 12, 34, 56, tzinfo=pytz.utc)
FAKE_TIME = datetime.datetime(2017, 1, 1, 12, 34, 56, tzinfo=pytz.utc)
# Inspired by: http://stackoverflow.com/a/24006251 # Inspired by: http://stackoverflow.com/a/24006251
@pytest.fixture @pytest.fixture
def clock(monkeypatch): def clock(monkeypatch):
class mydatetime(datetime.datetime): class mydatetime(dt.datetime):
@classmethod @classmethod
def now(cls, tz=None): def now(cls, tz=None):
return FAKE_TIME return FAKE_TIME
monkeypatch.setattr(datetime, 'datetime', mydatetime) monkeypatch.setattr(dt, 'datetime', mydatetime)
def test_clock(clock): def test_clock(clock):
assert datetime.datetime.now() == FAKE_TIME assert dt.datetime.now() == FAKE_TIME
@pytest.fixture @pytest.fixture
@ -51,16 +51,6 @@ def test_my_uuid4_fixture(uuid4):
assert uuid.uuid4() == "00000000-0000-0000-0000-000000000104" assert uuid.uuid4() == "00000000-0000-0000-0000-000000000104"
@pytest.mark.parametrize('dt,timestamp', [
(datetime.datetime(2017, 1, 1, tzinfo=pytz.utc), '2017-01-01T00:00:00Z'),
(amsterdam.localize(datetime.datetime(2017, 1, 1)), '2016-12-31T23:00:00Z'),
(eastern.localize(datetime.datetime(2017, 1, 1, 12, 34, 56)), '2017-01-01T17:34:56Z'),
(eastern.localize(datetime.datetime(2017, 7, 1)), '2017-07-01T04:00:00Z'),
])
def test_timestamp_formatting(dt, timestamp):
assert stix2.format_datetime(dt) == timestamp
INDICATOR_ID = "indicator--01234567-89ab-cdef-0123-456789abcdef" INDICATOR_ID = "indicator--01234567-89ab-cdef-0123-456789abcdef"
MALWARE_ID = "malware--fedcba98-7654-3210-fedc-ba9876543210" MALWARE_ID = "malware--fedcba98-7654-3210-fedc-ba9876543210"
RELATIONSHIP_ID = "relationship--00000000-1111-2222-3333-444444444444" RELATIONSHIP_ID = "relationship--00000000-1111-2222-3333-444444444444"
@ -114,8 +104,8 @@ EXPECTED_INDICATOR = """{
def test_indicator_with_all_required_fields(): def test_indicator_with_all_required_fields():
now = datetime.datetime(2017, 1, 1, 0, 0, 0, tzinfo=pytz.utc) now = dt.datetime(2017, 1, 1, 0, 0, 0, tzinfo=pytz.utc)
epoch = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc) epoch = dt.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc)
indicator = stix2.Indicator( indicator = stix2.Indicator(
type="indicator", type="indicator",
@ -176,7 +166,7 @@ def test_indicator_required_field_pattern():
def test_cannot_assign_to_indicator_attributes(indicator): def test_cannot_assign_to_indicator_attributes(indicator):
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
indicator.valid_from = datetime.datetime.now() indicator.valid_from = dt.datetime.now()
assert str(excinfo.value) == "Cannot modify properties after creation." assert str(excinfo.value) == "Cannot modify properties after creation."
@ -207,7 +197,7 @@ EXPECTED_MALWARE = """{
def test_malware_with_all_required_fields(): def test_malware_with_all_required_fields():
now = datetime.datetime(2016, 5, 12, 8, 17, 27, tzinfo=pytz.utc) now = dt.datetime(2016, 5, 12, 8, 17, 27, tzinfo=pytz.utc)
malware = stix2.Malware( malware = stix2.Malware(
type="malware", type="malware",
@ -288,7 +278,7 @@ EXPECTED_RELATIONSHIP = """{
def test_relationship_all_required_fields(): def test_relationship_all_required_fields():
now = datetime.datetime(2016, 4, 6, 20, 6, 37, tzinfo=pytz.utc) now = dt.datetime(2016, 4, 6, 20, 6, 37, tzinfo=pytz.utc)
relationship = stix2.Relationship( relationship = stix2.Relationship(
type='relationship', type='relationship',

19
stix2/test/test_utils.py Normal file
View File

@ -0,0 +1,19 @@
import datetime as dt
import pytest
import pytz
import stix2.utils
amsterdam = pytz.timezone('Europe/Amsterdam')
eastern = pytz.timezone('US/Eastern')
@pytest.mark.parametrize('dttm,timestamp', [
(dt.datetime(2017, 1, 1, tzinfo=pytz.utc), '2017-01-01T00:00:00Z'),
(amsterdam.localize(dt.datetime(2017, 1, 1)), '2016-12-31T23:00:00Z'),
(eastern.localize(dt.datetime(2017, 1, 1, 12, 34, 56)), '2017-01-01T17:34:56Z'),
(eastern.localize(dt.datetime(2017, 7, 1)), '2017-07-01T04:00:00Z'),
])
def test_timestamp_formatting(dttm, timestamp):
assert stix2.utils.format_datetime(dttm) == timestamp

26
stix2/utils.py Normal file
View File

@ -0,0 +1,26 @@
"""Utility functions and classes for the stix2 library."""
import datetime as dt
import pytz
# Sentinel value for fields that should be set to the current time.
# We can't use the standard 'default' approach, since if there are multiple
# timestamps in a single object, the timestamps will vary by a few microseconds.
NOW = object()
def get_timestamp():
return dt.datetime.now(tz=pytz.UTC)
def format_datetime(dttm):
# TODO: how to handle naive datetime
# 1. Convert to UTC
# 2. Format in ISO format
# 3. Strip off "+00:00"
# 4. Add "Z"
# TODO: how to handle timestamps with subsecond 0's
return dttm.astimezone(pytz.utc).isoformat()[:-6] + "Z"