Merge branch 'master' into markings

stix2.1
Greg Back 2017-08-31 20:36:59 +00:00
commit 124da846c3
39 changed files with 3178 additions and 1694 deletions

View File

@ -1,7 +1,7 @@
[settings]
check=1
diff=1
known_third_party=dateutil,pytest,pytz,six,requests
known_third_party=ordereddict,dateutil,pytest,pytz,requests,simplejson,six,stix2patterns,stix2validator,taxii2client
known_first_party=stix2
not_skip=__init__.py
force_sort_within_sections=1

View File

@ -39,8 +39,8 @@ constructor:
from stix2 import Indicator
indicator = Indicator(name="File hash for malware variant",
labels=['malicious-activity'],
pattern='file:hashes.md5 = "d41d8cd98f00b204e9800998ecf8427e"')
labels=["malicious-activity"],
pattern="[file:hashes.md5 = 'd41d8cd98f00b204e9800998ecf8427e']")
Certain required attributes of all objects will be set automatically if
not provided as keyword arguments:

View File

@ -19,6 +19,7 @@ def get_version():
with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f:
long_description = f.read()
setup(
name='stix2',
version=get_version(),
@ -46,9 +47,14 @@ setup(
keywords="stix stix2 json cti cyber threat intelligence",
packages=find_packages(),
install_requires=[
'pytz',
'six',
'ordereddict ; python_version<"2.7"',
'python-dateutil',
'pytz',
'requests',
'simplejson',
'six',
'stix2-patterns',
'stix2-validator',
'taxii2-client',
],
)

View File

@ -3,7 +3,7 @@
# flake8: noqa
from . import exceptions
from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE,
from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking,
ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking)
from .core import Bundle, _register_type, parse

View File

@ -3,7 +3,8 @@
import collections
import copy
import datetime as dt
import json
import simplejson as json
from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError,
ExtraPropertiesError, ImmutableError,
@ -11,7 +12,7 @@ from .exceptions import (AtLeastOnePropertyError, DependentPropertiesError,
MissingPropertiesError,
MutuallyExclusivePropertiesError)
from .markings.utils import validate
from .utils import NOW, format_datetime, get_timestamp
from .utils import NOW, find_property_index, format_datetime, get_timestamp
from .utils import new_version as _new_version
from .utils import revoke as _revoke
@ -38,6 +39,9 @@ def get_required_properties(properties):
class _STIXBase(collections.Mapping):
"""Base class for STIX object types"""
def object_properties(self):
return list(self._properties.keys())
def _check_property(self, prop_name, prop, kwargs):
if prop_name not in kwargs:
if hasattr(prop, 'default'):
@ -142,12 +146,18 @@ class _STIXBase(collections.Mapping):
super(_STIXBase, self).__setattr__(name, value)
def __str__(self):
# TODO: put keys in specific order. Probably need custom JSON encoder.
return json.dumps(self, indent=4, sort_keys=True, cls=STIXJSONEncoder,
separators=(",", ": ")) # Don't include spaces after commas.
properties = self.object_properties()
def sort_by(element):
return find_property_index(self, properties, element)
# separators kwarg -> don't include spaces after commas.
return json.dumps(self, indent=4, cls=STIXJSONEncoder,
item_sort_key=sort_by,
separators=(",", ": "))
def __repr__(self):
props = [(k, self[k]) for k in sorted(self._properties) if self.get(k)]
props = [(k, self[k]) for k in self.object_properties() if self.get(k)]
return "{0}({1})".format(self.__class__.__name__,
", ".join(["{0!s}={1!r}".format(k, v) for k, v in props]))
@ -186,18 +196,14 @@ class _Observable(_STIXBase):
try:
allowed_types = prop.contained.valid_types
except AttributeError:
try:
allowed_types = prop.valid_types
except AttributeError:
raise ValueError("'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty or a ListProperty "
"containing ObjectReferenceProperty." % prop_name)
allowed_types = prop.valid_types
try:
ref_type = self._STIXBase__valid_refs[ref]
except TypeError:
raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__)
if allowed_types:
try:
ref_type = self._STIXBase__valid_refs[ref]
except TypeError:
raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__)
if ref_type not in allowed_types:
raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type))

View File

@ -1,19 +1,26 @@
"""STIX 2 Common Data Types and Properties"""
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from .base import _STIXBase
from .properties import (BooleanProperty, IDProperty, ListProperty, Property,
from .properties import (HashesProperty, IDProperty, ListProperty, Property,
ReferenceProperty, SelectorProperty, StringProperty,
TimestampProperty, TypeProperty)
from .utils import NOW, get_dict
class ExternalReference(_STIXBase):
_properties = {
'source_name': StringProperty(required=True),
'description': StringProperty(),
'url': StringProperty(),
'external_id': StringProperty(),
}
_properties = OrderedDict()
_properties.update([
('source_name', StringProperty(required=True)),
('description', StringProperty()),
('url', StringProperty()),
('hashes', HashesProperty()),
('external_id', StringProperty()),
])
def _check_object_constraints(self):
super(ExternalReference, self)._check_object_constraints()
@ -21,30 +28,36 @@ class ExternalReference(_STIXBase):
class KillChainPhase(_STIXBase):
_properties = {
'kill_chain_name': StringProperty(required=True),
'phase_name': StringProperty(required=True),
}
_properties = OrderedDict()
_properties.update([
('kill_chain_name', StringProperty(required=True)),
('phase_name', StringProperty(required=True)),
])
class GranularMarking(_STIXBase):
_properties = {
'marking_ref': ReferenceProperty(required=True, type="marking-definition"),
'selectors': ListProperty(SelectorProperty, required=True),
}
_properties = OrderedDict()
_properties.update([
('marking_ref', ReferenceProperty(required=True, type="marking-definition")),
('selectors', ListProperty(SelectorProperty, required=True)),
])
class TLPMarking(_STIXBase):
# TODO: don't allow the creation of any other TLPMarkings than the ones below
_properties = {
'tlp': Property(required=True)
}
_type = 'tlp'
_properties = OrderedDict()
_properties.update([
('tlp', Property(required=True))
])
class StatementMarking(_STIXBase):
_properties = {
'statement': StringProperty(required=True)
}
_type = 'statement'
_properties = OrderedDict()
_properties.update([
('statement', StringProperty(required=True))
])
def __init__(self, statement=None, **kwargs):
# Allow statement as positional args.
@ -60,35 +73,32 @@ class MarkingProperty(Property):
"""
def clean(self, value):
if type(value) in [TLPMarking, StatementMarking]:
if type(value) in OBJ_MAP_MARKING.values():
return value
else:
raise ValueError("must be a Statement or TLP Marking.")
raise ValueError("must be a Statement, TLP Marking or a registered marking.")
class MarkingDefinition(_STIXBase):
_type = 'marking-definition'
_properties = {
'created': TimestampProperty(default=lambda: NOW),
'external_references': ListProperty(ExternalReference),
'created_by_ref': ReferenceProperty(type="identity"),
'object_marking_refs': ListProperty(ReferenceProperty(type="marking-definition")),
'granular_markings': ListProperty(GranularMarking),
'type': TypeProperty(_type),
'id': IDProperty(_type),
'definition_type': StringProperty(required=True),
'definition': MarkingProperty(required=True),
}
marking_map = {
'tlp': TLPMarking,
'statement': StatementMarking,
}
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
('definition_type', StringProperty(required=True)),
('definition', MarkingProperty(required=True)),
])
def __init__(self, **kwargs):
if set(('definition_type', 'definition')).issubset(kwargs.keys()):
# Create correct marking type object
try:
marking_type = self.marking_map[kwargs['definition_type']]
marking_type = OBJ_MAP_MARKING[kwargs['definition_type']]
except KeyError:
raise ValueError("definition_type must be a valid marking type")
@ -99,41 +109,78 @@ class MarkingDefinition(_STIXBase):
super(MarkingDefinition, self).__init__(**kwargs)
OBJ_MAP_MARKING = {
'tlp': TLPMarking,
'statement': StatementMarking,
}
def _register_marking(cls):
"""Register a custom STIX Marking Definition type.
"""
OBJ_MAP_MARKING[cls._type] = cls
return cls
def CustomMarking(type='x-custom-marking', properties=None):
"""
Custom STIX Marking decorator.
Examples:
@CustomMarking('x-custom-marking', [
('property1', StringProperty(required=True)),
('property2', IntegerProperty()),
])
class MyNewMarkingObjectType():
pass
"""
def custom_builder(cls):
class _Custom(cls, _STIXBase):
_type = type
_properties = OrderedDict()
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties.update(properties)
def __init__(self, **kwargs):
_STIXBase.__init__(self, **kwargs)
cls.__init__(self, **kwargs)
_register_marking(_Custom)
return _Custom
return custom_builder
TLP_WHITE = MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="white")
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="white")
)
TLP_GREEN = MarkingDefinition(
id="marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="green")
id="marking-definition--34098fce-860f-48ae-8e50-ebd3cc5e41da",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="green")
)
TLP_AMBER = MarkingDefinition(
id="marking-definition--f88d31f6-486f-44da-b317-01333bde0b82",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="amber")
id="marking-definition--f88d31f6-486f-44da-b317-01333bde0b82",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="amber")
)
TLP_RED = MarkingDefinition(
id="marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="red")
id="marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
created="2017-01-20T00:00:00.000Z",
definition_type="tlp",
definition=TLPMarking(tlp="red")
)
COMMON_PROPERTIES = {
# 'type' and 'id' should be defined on each individual type
'created': TimestampProperty(default=lambda: NOW, precision='millisecond'),
'modified': TimestampProperty(default=lambda: NOW, precision='millisecond'),
'external_references': ListProperty(ExternalReference),
'revoked': BooleanProperty(),
'labels': ListProperty(StringProperty),
'created_by_ref': ReferenceProperty(type="identity"),
'object_marking_refs': ListProperty(ReferenceProperty(type="marking-definition")),
'granular_markings': ListProperty(GranularMarking),
}

View File

@ -1,5 +1,9 @@
"""STIX 2.0 Objects that are neither SDOs nor SROs"""
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from . import exceptions
from .base import _STIXBase
@ -31,12 +35,13 @@ class STIXObjectProperty(Property):
class Bundle(_STIXBase):
_type = 'bundle'
_properties = {
'type': TypeProperty(_type),
'id': IDProperty(_type),
'spec_version': Property(fixed="2.0"),
'objects': ListProperty(STIXObjectProperty),
}
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('spec_version', Property(fixed="2.0")),
('objects', ListProperty(STIXObjectProperty)),
])
def __init__(self, *args, **kwargs):
# Add any positional arguments to the 'objects' kwarg.

View File

@ -121,7 +121,7 @@ class DependentPropertiesError(STIXError, TypeError):
def __str__(self):
msg = "The property dependencies for {0}: ({1}) are not met."
return msg.format(self.cls.__name__,
", ".join(x for x in self.dependencies))
", ".join(x for x, y in self.dependencies))
class AtLeastOnePropertyError(STIXError, TypeError):

File diff suppressed because it is too large Load Diff

View File

@ -6,6 +6,7 @@ import re
import uuid
from six import string_types, text_type
from stix2patterns.validator import run_validator
from .base import _STIXBase
from .exceptions import DictionaryKeyError
@ -371,3 +372,17 @@ class EnumProperty(StringProperty):
if value not in self.allowed:
raise ValueError("value '%s' is not valid for this enumeration." % value)
return self.string_type(value)
class PatternProperty(StringProperty):
def __init__(self, **kwargs):
super(PatternProperty, self).__init__(**kwargs)
def clean(self, value):
str_value = super(PatternProperty, self).clean(value)
errors = run_validator(str_value)
if errors:
raise ValueError(str(errors[0]))
return self.string_type(value)

View File

@ -1,220 +1,316 @@
"""STIX 2.0 Domain Objects"""
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
import stix2
from .base import _STIXBase
from .common import COMMON_PROPERTIES, KillChainPhase
from .common import ExternalReference, GranularMarking, KillChainPhase
from .observables import ObservableProperty
from .properties import (IDProperty, IntegerProperty, ListProperty,
ReferenceProperty, StringProperty, TimestampProperty,
TypeProperty)
from .properties import (BooleanProperty, IDProperty, IntegerProperty,
ListProperty, PatternProperty, ReferenceProperty,
StringProperty, TimestampProperty, TypeProperty)
from .utils import NOW
class AttackPattern(_STIXBase):
_type = 'attack-pattern'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'name': StringProperty(required=True),
'description': StringProperty(),
'kill_chain_phases': ListProperty(KillChainPhase),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('kill_chain_phases', ListProperty(KillChainPhase)),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class Campaign(_STIXBase):
_type = 'campaign'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'name': StringProperty(required=True),
'description': StringProperty(),
'aliases': ListProperty(StringProperty),
'first_seen': TimestampProperty(),
'last_seen': TimestampProperty(),
'objective': StringProperty(),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('aliases', ListProperty(StringProperty)),
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
('objective', StringProperty()),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class CourseOfAction(_STIXBase):
_type = 'course-of-action'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'name': StringProperty(required=True),
'description': StringProperty(),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class Identity(_STIXBase):
_type = 'identity'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'name': StringProperty(required=True),
'description': StringProperty(),
'identity_class': StringProperty(required=True),
'sectors': ListProperty(StringProperty),
'contact_information': StringProperty(),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('identity_class', StringProperty(required=True)),
('sectors', ListProperty(StringProperty)),
('contact_information', StringProperty()),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class Indicator(_STIXBase):
_type = 'indicator'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'labels': ListProperty(StringProperty, required=True),
'name': StringProperty(),
'description': StringProperty(),
'pattern': StringProperty(required=True),
'valid_from': TimestampProperty(default=lambda: NOW),
'valid_until': TimestampProperty(),
'kill_chain_phases': ListProperty(KillChainPhase),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('labels', ListProperty(StringProperty, required=True)),
('name', StringProperty()),
('description', StringProperty()),
('pattern', PatternProperty(required=True)),
('valid_from', TimestampProperty(default=lambda: NOW)),
('valid_until', TimestampProperty()),
('kill_chain_phases', ListProperty(KillChainPhase)),
('revoked', BooleanProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class IntrusionSet(_STIXBase):
_type = 'intrusion-set'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'name': StringProperty(required=True),
'description': StringProperty(),
'aliases': ListProperty(StringProperty),
'first_seen': TimestampProperty(),
'last_seen ': TimestampProperty(),
'goals': ListProperty(StringProperty),
'resource_level': StringProperty(),
'primary_motivation': StringProperty(),
'secondary_motivations': ListProperty(StringProperty),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('aliases', ListProperty(StringProperty)),
('first_seen', TimestampProperty()),
('last_seen ', TimestampProperty()),
('goals', ListProperty(StringProperty)),
('resource_level', StringProperty()),
('primary_motivation', StringProperty()),
('secondary_motivations', ListProperty(StringProperty)),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class Malware(_STIXBase):
_type = 'malware'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'labels': ListProperty(StringProperty, required=True),
'name': StringProperty(required=True),
'description': StringProperty(),
'kill_chain_phases': ListProperty(KillChainPhase),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('kill_chain_phases', ListProperty(KillChainPhase)),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty, required=True)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class ObservedData(_STIXBase):
_type = 'observed-data'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'first_observed': TimestampProperty(required=True),
'last_observed': TimestampProperty(required=True),
'number_observed': IntegerProperty(required=True),
'objects': ObservableProperty(),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('first_observed', TimestampProperty(required=True)),
('last_observed', TimestampProperty(required=True)),
('number_observed', IntegerProperty(required=True)),
('objects', ObservableProperty()),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class Report(_STIXBase):
_type = 'report'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'labels': ListProperty(StringProperty, required=True),
'name': StringProperty(required=True),
'description': StringProperty(),
'published': TimestampProperty(),
'object_refs': ListProperty(ReferenceProperty),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('published', TimestampProperty()),
('object_refs', ListProperty(ReferenceProperty)),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty, required=True)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class ThreatActor(_STIXBase):
_type = 'threat-actor'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'labels': ListProperty(StringProperty, required=True),
'name': StringProperty(required=True),
'description': StringProperty(),
'aliases': ListProperty(StringProperty),
'roles': ListProperty(StringProperty),
'goals': ListProperty(StringProperty),
'sophistication': StringProperty(),
'resource_level': StringProperty(),
'primary_motivation': StringProperty(),
'secondary_motivations': ListProperty(StringProperty),
'personal_motivations': ListProperty(StringProperty),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('aliases', ListProperty(StringProperty)),
('roles', ListProperty(StringProperty)),
('goals', ListProperty(StringProperty)),
('sophistication', StringProperty()),
('resource_level', StringProperty()),
('primary_motivation', StringProperty()),
('secondary_motivations', ListProperty(StringProperty)),
('personal_motivations', ListProperty(StringProperty)),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty, required=True)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class Tool(_STIXBase):
_type = 'tool'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'labels': ListProperty(StringProperty, required=True),
'name': StringProperty(required=True),
'description': StringProperty(),
'kill_chain_phases': ListProperty(KillChainPhase),
'tool_version': StringProperty(),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('kill_chain_phases', ListProperty(KillChainPhase)),
('tool_version', StringProperty()),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty, required=True)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
class Vulnerability(_STIXBase):
_type = 'vulnerability'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'type': TypeProperty(_type),
'id': IDProperty(_type),
'name': StringProperty(required=True),
'description': StringProperty(),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
def CustomObject(type='x-custom-type', properties={}):
def CustomObject(type='x-custom-type', properties=None):
"""Custom STIX Object type decorator
Example 1:
@CustomObject('x-type-name', {
'property1': StringProperty(required=True),
'property2': IntegerProperty(),
})
@CustomObject('x-type-name', [
('property1', StringProperty(required=True)),
('property2', IntegerProperty()),
])
class MyNewObjectType():
pass
Supply an __init__() function to add any special validations to the custom
type. Don't call super().__init() though - doing so will cause an error.
type. Don't call super().__init__() though - doing so will cause an error.
Example 2:
@CustomObject('x-type-name', {
'property1': StringProperty(required=True),
'property2': IntegerProperty(),
})
@CustomObject('x-type-name', [
('property1', StringProperty(required=True)),
('property2', IntegerProperty()),
])
class MyNewObjectType():
def __init__(self, property2=None, **kwargs):
if property2 and property2 < 10:
@ -225,12 +321,31 @@ def CustomObject(type='x-custom-type', properties={}):
class _Custom(cls, _STIXBase):
_type = type
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'id': IDProperty(_type),
'type': TypeProperty(_type),
})
_properties.update(properties)
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
])
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties.update([x for x in properties if not x[0].startswith("x_")])
# This is to follow the general properties structure.
_properties.update([
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
# Put all custom properties at the bottom, sorted alphabetically.
_properties.update(sorted([x for x in properties if x[0].startswith("x_")], key=lambda x: x[0]))
def __init__(self, **kwargs):
_STIXBase.__init__(self, **kwargs)

File diff suppressed because it is too large Load Diff

188
stix2/sources/filesystem.py Normal file
View File

@ -0,0 +1,188 @@
"""
Python STIX 2.0 FileSystem Source/Sink
Classes:
FileSystemStore
FileSystemSink
FileSystemSource
TODO: Test everything
"""
import json
import os
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore, Filter
class FileSystemStore(DataStore):
"""
"""
def __init__(self, name="FileSystemStore", stix_dir="stix_data"):
super(FileSystemStore, self).__init__(name=name)
self.source = FileSystemSource(stix_dir=stix_dir)
self.sink = FileSystemSink(stix_dir=stix_dir)
class FileSystemSink(DataSink):
"""
"""
def __init__(self, name="FileSystemSink", stix_dir="stix_data"):
super(FileSystemSink, self).__init__(name=name)
self.stix_dir = os.path.abspath(stix_dir)
# check directory path exists
if not os.path.exists(self.stix_dir):
print("Error: directory path for STIX data does not exist")
@property
def stix_dir(self):
return self.stix_dir
@stix_dir.setter
def stix_dir(self, dir):
self.stix_dir = dir
def add(self, stix_objs=None):
"""
Q: bundlify or no?
"""
if not stix_objs:
stix_objs = []
for stix_obj in stix_objs:
path = os.path.join(self.stix_dir, stix_obj["type"], stix_obj["id"])
json.dump(Bundle([stix_obj]), open(path, 'w+'), indent=4)
class FileSystemSource(DataSource):
"""
"""
def __init__(self, name="FileSystemSource", stix_dir="stix_data"):
super(FileSystemSource, self).__init__(name=name)
self.stix_dir = os.path.abspath(stix_dir)
# check directory path exists
if not os.path.exists(self.stix_dir):
print("Error: directory path for STIX data does not exist")
@property
def stix_dir(self):
return self.stix_dir
@stix_dir.setter
def stix_dir(self, dir_):
self.stix_dir = dir_
def get(self, stix_id, _composite_filters=None):
"""
"""
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters)
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
return stix_obj
def all_versions(self, stix_id, _composite_filters=None):
"""
Notes:
Since FileSystem sources/sinks don't handle multiple versions
of a STIX object, this operation is unnecessary. Pass call to get().
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None):
"""
"""
all_data = []
if query is None:
query = []
# combine all query filters
if self.filters:
query.extend(self.filters.values())
if _composite_filters:
query.extend(_composite_filters)
# extract any filters that are for "type" or "id" , as we can then do
# filtering before reading in the STIX objects. A STIX 'type' filter
# can reduce the query to a single sub-directory. A STIX 'id' filter
# allows for the fast checking of the file names versus loading it.
file_filters = self._parse_file_filters(query)
# establish which subdirectories can be avoided in query
# by decluding as many as possible. A filter with "type" as the field
# means that certain STIX object types can be ruled out, and thus
# the corresponding subdirectories as well
include_paths = []
declude_paths = []
if "type" in [filter_.field for filter_ in file_filters]:
for filter_ in file_filters:
if filter_.field == "type":
if filter_.op == "=":
include_paths.append(os.path.join(self.stix_dir, filter_.value))
elif filter_.op == "!=":
declude_paths.append(os.path.join(self.stix_dir, filter_.value))
else:
# have to walk entire STIX directory
include_paths.append(self.stix_dir)
# if a user specifies a "type" filter like "type = <stix-object_type>",
# the filter is reducing the search space to single stix object types
# (and thus single directories). This makes such a filter more powerful
# than "type != <stix-object_type>" bc the latter is substracting
# only one type of stix object type (and thus only one directory),
# As such the former type of filters are given preference over the latter;
# i.e. if both exist in a query, that latter type will be ignored
if not include_paths:
# user has specified types that are not wanted (i.e. "!=")
# so query will look in all STIX directories that are not
# the specified type. Compile correct dir paths
for dir_ in os.listdir(self.stix_dir):
if os.path.abspath(dir_) not in declude_paths:
include_paths.append(os.path.abspath(dir_))
# grab stix object ID as well - if present in filters, as
# may forgo the loading of STIX content into memory
if "id" in [filter_.field for filter_ in file_filters]:
for filter_ in file_filters:
if filter_.field == "id" and filter_.op == "=":
id_ = filter_.value
break
else:
id_ = None
else:
id_ = None
# now iterate through all STIX objs
for path in include_paths:
for root, dirs, files in os.walk(path):
for file_ in files:
if id_:
if id_ == file_.split(".")[0]:
# since ID is specified in one of filters, can evaluate against filename first without loading
stix_obj = json.load(file_)["objects"]
# check against other filters, add if match
all_data.extend(self.apply_common_filters([stix_obj], query))
else:
# have to load into memory regardless to evaluate other filters
stix_obj = json.load(file_)["objects"]
all_data.extend(self.apply_common_filters([stix_obj], query))
all_data = self.deduplicate(all_data)
return all_data
def _parse_file_filters(self, query):
"""
"""
file_filters = []
for filter_ in query:
if filter_.field == "id" or filter_.field == "type":
file_filters.append(filter_)
return file_filters

204
stix2/sources/filters.py Normal file
View File

@ -0,0 +1,204 @@
"""
Filters for Python STIX 2.0 DataSources, DataSinks, DataStores
Classes:
Filter
TODO: The script at the bottom of the module works (to capture
all the callable filter methods), however it causes this module
to be imported by itself twice. Not sure how big of deal that is,
or if cleaner solution possible.
"""
import collections
import types
# Currently, only STIX 2.0 common SDO fields (that are not complex objects)
# are supported for filtering on
STIX_COMMON_FIELDS = [
"created",
"created_by_ref",
"external_references.source_name",
"external_references.description",
"external_references.url",
"external_references.hashes",
"external_references.external_id",
"granular_markings.marking_ref",
"granular_markings.selectors",
"id",
"labels",
"modified",
"object_marking_refs",
"revoked",
"type",
"granular_markings"
]
# Supported filter operations
FILTER_OPS = ['=', '!=', 'in', '>', '<', '>=', '<=']
# Supported filter value types
FILTER_VALUE_TYPES = [bool, dict, float, int, list, str, tuple]
# filter lookup map - STIX 2 common fields -> filter method
STIX_COMMON_FILTERS_MAP = {}
class Filter(collections.namedtuple("Filter", ['field', 'op', 'value'])):
__slots__ = ()
def __new__(cls, field, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
self = super(Filter, cls).__new__(cls, field, op, value)
return self
# primitive type filters
def _all_filter(filter_, stix_obj_field):
"""all filter operations (for filters whose value type can be applied to any operation type)"""
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
elif filter_.op == "in":
return stix_obj_field in filter_.value
elif filter_.op == ">":
return stix_obj_field > filter_.value
elif filter_.op == "<":
return stix_obj_field < filter_.value
elif filter_.op == ">=":
return stix_obj_field >= filter_.value
elif filter_.op == "<=":
return stix_obj_field <= filter_.value
else:
return -1
def _id_filter(filter_, stix_obj_id):
"""base filter types"""
if filter_.op == "=":
return stix_obj_id == filter_.value
elif filter_.op == "!=":
return stix_obj_id != filter_.value
else:
return -1
def _boolean_filter(filter_, stix_obj_field):
if filter_.op == "=":
return stix_obj_field == filter_.value
elif filter_.op == "!=":
return stix_obj_field != filter_.value
else:
return -1
def _string_filter(filter_, stix_obj_field):
return _all_filter(filter_, stix_obj_field)
def _timestamp_filter(filter_, stix_obj_timestamp):
return _all_filter(filter_, stix_obj_timestamp)
# STIX 2.0 Common Property filters
# The naming of these functions is important as
# they are used to index a mapping dictionary from
# STIX common field names to these filter functions.
#
# REQUIRED naming scheme:
# "check_<STIX field name>_filter"
def check_created_filter(filter_, stix_obj):
return _timestamp_filter(filter_, stix_obj["created"])
def check_created_by_ref_filter(filter_, stix_obj):
return _id_filter(filter_, stix_obj["created_by_ref"])
def check_external_references_filter(filter_, stix_obj):
"""
STIX object's can have a list of external references
external_references properties:
external_references.source_name (string)
external_references.description (string)
external_references.url (string)
external_references.hashes (hash, but for filtering purposes, a string)
external_references.external_id (string)
"""
for er in stix_obj["external_references"]:
# grab er property name from filter field
filter_field = filter_.field.split(".")[1]
r = _string_filter(filter_, er[filter_field])
if r:
return r
return False
def check_granular_markings_filter(filter_, stix_obj):
"""
STIX object's can have a list of granular marking references
granular_markings properties:
granular_markings.marking_ref (id)
granular_markings.selectors (string)
"""
for gm in stix_obj["granular_markings"]:
# grab gm property name from filter field
filter_field = filter_.field.split(".")[1]
if filter_field == "marking_ref":
return _id_filter(filter_, gm[filter_field])
elif filter_field == "selectors":
for selector in gm[filter_field]:
r = _string_filter(filter_, selector)
if r:
return r
return False
def check_id_filter(filter_, stix_obj):
return _id_filter(filter_, stix_obj["id"])
def check_labels_filter(filter_, stix_obj):
for label in stix_obj["labels"]:
r = _string_filter(filter_, label)
if r:
return r
return False
def check_modified_filter(filter_, stix_obj):
return _timestamp_filter(filter_, stix_obj["modified"])
def check_object_marking_refs_filter(filter_, stix_obj):
for marking_id in stix_obj["object_marking_refs"]:
r = _id_filter(filter_, marking_id)
if r:
return r
return False
def check_revoked_filter(filter_, stix_obj):
return _boolean_filter(filter_, stix_obj["revoked"])
def check_type_filter(filter_, stix_obj):
return _string_filter(filter_, stix_obj["type"])
# Create mapping of field names to filter functions
for name, obj in dict(globals()).items():
if "check_" in name and isinstance(obj, types.FunctionType):
field_name = "_".join(name.split("_")[1:-1])
STIX_COMMON_FILTERS_MAP[field_name] = obj

261
stix2/sources/memory.py Normal file
View File

@ -0,0 +1,261 @@
"""
Python STIX 2.0 Memory Source/Sink
Classes:
MemoryStore
MemorySink
MemorySource
TODO: Test everything.
TODO: Use deduplicate() calls only when memory corpus is dirty (been added to)
can save a lot of time for successive queries
Notes:
Not worrying about STIX versioning. The in memory STIX data at anytime
will only hold one version of a STIX object. As such, when save() is called,
the single versions of all the STIX objects are what is written to file.
"""
import json
import os
from stix2validator import validate_string
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore
from stix2.sources.filters import Filter
class MemoryStore(DataStore):
"""
"""
def __init__(self, name="MemoryStore", stix_data=None):
"""
Notes:
It doesn't make sense to create a MemoryStore by passing
in existing MemorySource and MemorySink because there could
be data concurrency issues. Just as easy to create new MemoryStore.
"""
super(MemoryStore, self).__init__(name=name)
self.data = {}
if stix_data:
if type(stix_data) == dict:
# stix objects are in a bundle
# verify STIX json data
r = validate_string(json.dumps(stix_data))
# make dictionary of the objects for easy lookup
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: json data passed to MemorySink() was found to not be validated by STIX 2 Validator")
print(r)
elif type(stix_data) == list:
# stix objects are in a list
for stix_obj in stix_data:
r = validate_string(json.dumps(stix_obj))
if r.is_valid:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: STIX object %s is not valid under STIX 2 validator." % stix_obj["id"])
print(r)
self.source = MemorySource(stix_data=self.data, _store=True)
self.sink = MemorySink(stix_data=self.data, _store=True)
def save_to_file(self, file_path):
return self.sink.save_to_file(file_path=file_path)
def load_from_file(self, file_path):
return self.source.load_from_file(file_path=file_path)
class MemorySink(DataSink):
"""
"""
def __init__(self, name="MemorySink", stix_data=None, _store=False):
"""
Args:
stix_data (dictionary OR list): valid STIX 2.0 content in
bundle or a list.
name (string): optional name tag of the data source
_store (bool): if the MemorySink is a part of a DataStore,
in which case "stix_data" is a direct reference to
shared memory with DataSource.
"""
super(MemorySink, self).__init__(name=name)
if _store:
self.data = stix_data
else:
self.data = {}
if stix_data:
if type(stix_data) == dict:
# stix objects are in a bundle
# verify STIX json data
r = validate_string(json.dumps(stix_data))
# make dictionary of the objects for easy lookup
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: json data passed to MemorySink() was found to not be validated by STIX 2 Validator")
print(r)
self.data = {}
elif type(stix_data) == list:
# stix objects are in a list
for stix_obj in stix_data:
r = validate_string(json.dumps(stix_obj))
if r.is_valid:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: STIX object %s is not valid under STIX 2 validator." % stix_obj["id"])
print(r)
else:
raise ValueError("stix_data must be in bundle format or raw list")
def add(self, stix_data):
"""
"""
if type(stix_data) == dict:
# stix data is in bundle
r = validate_string(json.dumps(stix_data))
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: json data passed to MemorySink() was found to not be validated by STIX 2 Validator")
print(r)
elif type(stix_data) == list:
# stix data is in list
for stix_obj in stix_data:
r = validate_string(json.dumps(stix_obj))
if r.is_valid:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: STIX object %s is not valid under STIX 2 validator." % stix_obj["id"])
print(r)
else:
raise ValueError("stix_data must be in bundle format or raw list")
def save_to_file(self, file_path):
"""
"""
json.dump(Bundle(self.data.values()), file_path, indent=4)
class MemorySource(DataSource):
def __init__(self, name="MemorySource", stix_data=None, _store=False):
"""
Args:
stix_data (dictionary OR list): valid STIX 2.0 content in
bundle or list.
name (string): optional name tag of the data source.
_store (bool): if the MemorySource is a part of a DataStore,
in which case "stix_data" is a direct reference to shared
memory with DataSink.
"""
super(MemorySource, self).__init__(name=name)
if _store:
self.data = stix_data
else:
self.data = {}
if stix_data:
if type(stix_data) == dict:
# STIX objects are in a bundle
# verify STIX json data
r = validate_string(json.dumps(stix_data))
# make dictionary of the objects for easy lookup
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: json data passed to MemorySource() was found to not be validated by STIX 2 Validator")
print(r.as_dict())
self.data = {}
elif type(stix_data) == list:
# STIX objects are in a list
for stix_obj in stix_data:
r = validate_string(json.dumps(stix_obj))
if r.is_valid:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: STIX object %s is not valid under STIX 2 validator." % stix_obj["id"])
print(r)
else:
raise ValueError("stix_data must be in bundle format or raw list")
def get(self, stix_id, _composite_filters=None):
"""
"""
if _composite_filters is None:
# if get call is only based on 'id', no need to search, just retrieve from dict
try:
stix_obj = self.data[stix_id]
except KeyError:
stix_obj = None
return stix_obj
# if there are filters from the composite level, process full query
query = [Filter("id", "=", stix_id)]
all_data = self.query(query=query, _composite_filters=_composite_filters)
# reduce to most recent version
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
return stix_obj
def all_versions(self, stix_id, _composite_filters=None):
"""
Notes:
Since Memory sources/sinks don't handle multiple versions of a
STIX object, this operation is unnecessary. Translate call to get().
"""
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
def query(self, query=None, _composite_filters=None):
"""
"""
if query is None:
query = []
# combine all query filters
if self.filters:
query.extend(self.filters.values())
if _composite_filters:
query.extend(_composite_filters)
# deduplicate data before filtering -> Deduplication is not required as Memory only ever holds one version of an object
# all_data = self.deduplicate(all_data)
# apply STIX common property filters
all_data = self.apply_common_filters(self.data.values(), query)
return all_data
def load_from_file(self, file_path):
"""
"""
file_path = os.path.abspath(file_path)
stix_data = json.load(open(file_path, "r"))
r = validate_string(json.dumps(stix_data))
if r.is_valid:
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: STIX data loaded from file (%s) was found to not be validated by STIX 2 Validator" % file_path)
print(r)

View File

@ -1,132 +1,97 @@
import requests
from requests.auth import HTTPBasicAuth
"""
Python STIX 2.0 TAXII Source/Sink
from stix2.sources import DataSource
Classes:
TAXIICollectionStore
TAXIICollectionSink
TAXIICollectionSource
# TODO: -Should we make properties for the TAXIIDataSource address and other
# possible variables that are found in "self.taxii_info"
TODO: Test everything
"""
import json
from stix2.sources import DataSink, DataSource, DataStore, make_id
from stix2.sources.filters import Filter
TAXII_FILTERS = ['added_after', 'id', 'type', 'version']
test = True
class TAXIIDataSource(DataSource):
"""STIX 2.0 Data Source - TAXII 2.0 module"""
def __init__(self, api_root=None, auth=None, name="TAXII"):
super(TAXIIDataSource, self).__init__(name=name)
if not api_root:
api_root = "http://localhost:5000"
if not auth:
auth = {"user": "admin", "pass": "taxii"}
self.taxii_info = {
"api_root": {
"url": api_root
},
"auth": auth
}
if test:
return
try:
# check api-root is reachable/exists and grab api collections
coll_url = self.taxii_info['api_root']['url'] + "/collections/"
headers = {}
resp = requests.get(coll_url,
headers=headers,
auth=HTTPBasicAuth(self.taxii_info['auth']['user'],
self.taxii_info['auth']['pass']))
# TESTING
# print("\n-------__init__() ----\n")
# print(resp.text)
# print("\n")
# print(resp.status_code)
# END TESTING
# raise http error if request returned error code
resp.raise_for_status()
resp_json = resp.json()
try:
self.taxii_info['api_root']['collections'] = resp_json['collections']
except KeyError as e:
if e == "collections":
raise
# raise type(e), type(e)(e.message +
# "To connect to the TAXII collections, the API root
# resource must contain a collection endpoint URL.
# This was not found in the API root resource received
# from the API root" ), sys.exc_info()[2]
except requests.ConnectionError as e:
raise
# raise type(e), type(e)(e.message +
# "Attempting to connect to %s" % coll_url)
def get(self, id_, _composite_filters=None):
"""Get STIX 2.0 object from TAXII source by specified 'id'
Notes:
Just pass _composite_filters to the query() as they are applied
there. de-duplication of results is also done within query()
class TAXIICollectionStore(DataStore):
"""
"""
def __init__(self, collection, name="TAXIICollectionStore"):
"""
Create a new TAXII Collection Data store
Args:
id_ (str): id of STIX object to retrieve
_composite_filters (list): filters passed from a Composite Data
Source (if this data source is attached to one)
Returns:
collection (taxii2.Collection): Collection instance
"""
super(TAXIICollectionStore, self).__init__(name=name)
self.source = TAXIICollectionSource(collection)
self.sink = TAXIICollectionSink(collection)
# make query in TAXII query format since 'id' is TAXii field
query = [
{
"field": "match[id]",
"op": "=",
"value": id_
}
]
all_data = self.query(query=query, _composite_filters=_composite_filters)
class TAXIICollectionSink(DataSink):
"""
"""
def __init__(self, collection, name="TAXIICollectionSink"):
super(TAXIICollectionSink, self).__init__(name=name)
self.collection = collection
# reduce to most recent version
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
def add(self, stix_obj):
"""
"""
self.collection.add_objects(self.create_bundle([json.loads(str(stix_obj))]))
@staticmethod
def create_bundle(objects):
return dict(id="bundle--%s" % make_id(),
objects=objects,
spec_version="2.0",
type="bundle")
class TAXIICollectionSource(DataSource):
"""
"""
def __init__(self, collection, name="TAXIICollectionSource"):
super(TAXIICollectionSource, self).__init__(name=name)
self.collection = collection
def get(self, stix_id, _composite_filters=None):
"""
"""
# combine all query filters
query = []
if self.filters:
query.extend(self.filters.values())
if _composite_filters:
query.extend(_composite_filters)
# separate taxii query terms (can be done remotely)
taxii_filters = self._parse_taxii_filters(query)
stix_objs = self.collection.get_object(stix_id, taxii_filters)["objects"]
stix_obj = self.apply_common_filters(stix_objs, query)
if len(stix_obj) > 0:
stix_obj = stix_obj[0]
else:
stix_obj = None
return stix_obj
def all_versions(self, id_, _composite_filters=None):
"""Get all versions of STIX 2.0 object from TAXII source by
specified 'id'
Notes:
Just passes _composite_filters to the query() as they are applied
there. de-duplication of results is also done within query()
Args:
id_ (str): id of STIX objects to retrieve
_composite_filters (list): filters passed from a Composite Data
Source (if this data source is attached to one)
Returns:
The query results with filters applied.
def all_versions(self, stix_id, _composite_filters=None):
"""
"""
# make query in TAXII query format since 'id' is TAXII field
query = [
{
"field": "match[id]",
"op": "=",
"value": id_
}
Filter("match[id]", "=", stix_id),
Filter("match[version]", "=", "all")
]
all_data = self.query(query=query, _composite_filters=_composite_filters)
@ -134,84 +99,22 @@ class TAXIIDataSource(DataSource):
return all_data
def query(self, query=None, _composite_filters=None):
"""Query the TAXII data source for STIX objects matching the query
The final full query could contain filters from:
-the current API call
-Composite Data source filters (that are passed in via
'_composite_filters')
-TAXII data source filters that are attached
TAXII filters ['added_after', 'match[<>]'] are extracted and sent
to TAXII if they are present
TODO: Authentication for TAXII
Args:
query(list): list of filters (dicts) to search on
_composite_filters (list): filters passed from a
Composite Data Source (if this data source is attached to one)
Returns:
"""
all_data = []
"""
if query is None:
query = []
# combine all query filters
if self.filters:
query += self.filters.values()
query.extend(self.filters.values())
if _composite_filters:
query += _composite_filters
query.extend(_composite_filters)
# separate taxii query terms (can be done remotely)
taxii_filters = self._parse_taxii_filters(query)
# for each collection endpoint - send query request
for collection in self.taxii_info['api_root']['collections']:
coll_obj_url = "/".join([self.taxii_info['api_root']['url'],
"collections", str(collection['id']),
"objects"])
headers = {}
try:
resp = requests.get(coll_obj_url,
params=taxii_filters,
headers=headers,
auth=HTTPBasicAuth(self.taxii_info['auth']['user'],
self.taxii_info['auth']['pass']))
# TESTING
# print("\n-------query() ----\n")
# print("Request that was sent: \n")
# print(resp.url)
# print("Response: \n")
# print(json.dumps(resp.json(),indent=4))
# print("\n")
# print(resp.status_code)
# print("------------------")
# END TESTING
# raise http error if request returned error code
resp.raise_for_status()
resp_json = resp.json()
# grab all STIX 2.0 objects in json response
for stix_obj in resp_json['objects']:
all_data.append(stix_obj)
except requests.exceptions.RequestException as e:
raise e
# raise type(e), type(e)(e.message +
# "Attempting to connect to %s" % coll_url)
# TODO: Is there a way to collect exceptions while carrying
# on then raise all of them at the end?
# query TAXII collection
all_data = self.collection.get_objects(filters=taxii_filters)["objects"]
# deduplicate data (before filtering as reduces wasted filtering)
all_data = self.deduplicate(all_data)
@ -222,16 +125,13 @@ class TAXIIDataSource(DataSource):
return all_data
def _parse_taxii_filters(self, query):
"""Parse out TAXII filters that the TAXII server can filter on
"""Parse out TAXII filters that the TAXII server can filter on.
TAXII filters should be analgous to how they are supplied
in the url to the TAXII endpoint. For instance
"?match[type]=indicator,sighting" should be in a query dict as follows
{
"field": "match[type]"
"op": "=",
"value": "indicator,sighting"
}
Notes:
For instance - "?match[type]=indicator,sighting" should be in a
query dict as follows:
Filter("type", "=", "indicator,sighting")
Args:
query (list): list of filters to extract which ones are TAXII
@ -240,23 +140,15 @@ class TAXIIDataSource(DataSource):
Returns:
params (dict): dict of the TAXII filters but in format required
for 'requests.get()'.
"""
"""
params = {}
for q in query:
if q['field'] in TAXII_FILTERS:
if q['field'] == 'added_after':
params[q['field']] = q['value']
for filter_ in query:
if filter_.field in TAXII_FILTERS:
if filter_.field == "added_after":
params[filter_.field] = filter_.value
else:
taxii_field = 'match[' + q['field'] + ']'
params[taxii_field] = q['value']
taxii_field = "match[%s]" % filter_.field
params[taxii_field] = filter_.value
return params
def close(self):
"""Close down the Data Source - if any clean up is required.
"""
pass
# TODO: - getters/setters (properties) for TAXII config info

View File

@ -1,31 +1,42 @@
"""STIX 2.0 Relationship Objects."""
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from .base import _STIXBase
from .common import COMMON_PROPERTIES
from .properties import (IDProperty, IntegerProperty, ListProperty,
ReferenceProperty, StringProperty, TimestampProperty,
TypeProperty)
from .common import ExternalReference, GranularMarking
from .properties import (BooleanProperty, IDProperty, IntegerProperty,
ListProperty, ReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
from .utils import NOW
class Relationship(_STIXBase):
_type = 'relationship'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'id': IDProperty(_type),
'type': TypeProperty(_type),
'relationship_type': StringProperty(required=True),
'description': StringProperty(),
'source_ref': ReferenceProperty(required=True),
'target_ref': ReferenceProperty(required=True),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('relationship_type', StringProperty(required=True)),
('description', StringProperty()),
('source_ref', ReferenceProperty(required=True)),
('target_ref', ReferenceProperty(required=True)),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
# Explicitly define the first three kwargs to make readable Relationship declarations.
def __init__(self, source_ref=None, relationship_type=None, target_ref=None,
**kwargs):
# TODO:
# - description
def __init__(self, source_ref=None, relationship_type=None,
target_ref=None, **kwargs):
# Allow (source_ref, relationship_type, target_ref) as positional args.
if source_ref and not kwargs.get('source_ref'):
kwargs['source_ref'] = source_ref
@ -39,24 +50,29 @@ class Relationship(_STIXBase):
class Sighting(_STIXBase):
_type = 'sighting'
_properties = COMMON_PROPERTIES.copy()
_properties.update({
'id': IDProperty(_type),
'type': TypeProperty(_type),
'first_seen': TimestampProperty(),
'last_seen': TimestampProperty(),
'count': IntegerProperty(),
'sighting_of_ref': ReferenceProperty(required=True),
'observed_data_refs': ListProperty(ReferenceProperty(type="observed-data")),
'where_sighted_refs': ListProperty(ReferenceProperty(type="identity")),
'summary': StringProperty(),
})
_properties = OrderedDict()
_properties.update([
('type', TypeProperty(_type)),
('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
('count', IntegerProperty()),
('sighting_of_ref', ReferenceProperty(required=True)),
('observed_data_refs', ListProperty(ReferenceProperty(type="observed-data"))),
('where_sighted_refs', ListProperty(ReferenceProperty(type="identity"))),
('summary', BooleanProperty()),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])
# Explicitly define the first kwargs to make readable Sighting declarations.
def __init__(self, sighting_of_ref=None, **kwargs):
# TODO:
# - description
# Allow sighting_of_ref as a positional arg.
if sighting_of_ref and not kwargs.get('sighting_of_ref'):
kwargs['sighting_of_ref'] = sighting_of_ref

View File

@ -9,18 +9,18 @@ from .constants import ATTACK_PATTERN_ID
EXPECTED = """{
"type": "attack-pattern",
"id": "attack-pattern--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
"created": "2016-05-12T08:17:27.000Z",
"modified": "2016-05-12T08:17:27.000Z",
"name": "Spear Phishing",
"description": "...",
"external_references": [
{
"external_id": "CAPEC-163",
"source_name": "capec"
"source_name": "capec",
"external_id": "CAPEC-163"
}
],
"id": "attack-pattern--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
"modified": "2016-05-12T08:17:27.000Z",
"name": "Spear Phishing",
"type": "attack-pattern"
]
}"""

View File

@ -4,41 +4,41 @@ import stix2
EXPECTED_BUNDLE = """{
"type": "bundle",
"id": "bundle--00000000-0000-0000-0000-000000000004",
"spec_version": "2.0",
"objects": [
{
"created": "2017-01-01T12:34:56.000Z",
"type": "indicator",
"id": "indicator--00000000-0000-0000-0000-000000000001",
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"labels": [
"malicious-activity"
],
"modified": "2017-01-01T12:34:56.000Z",
"pattern": "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
"type": "indicator",
"valid_from": "2017-01-01T12:34:56Z"
},
{
"created": "2017-01-01T12:34:56.000Z",
"type": "malware",
"id": "malware--00000000-0000-0000-0000-000000000002",
"labels": [
"ransomware"
],
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"name": "Cryptolocker",
"type": "malware"
"labels": [
"ransomware"
]
},
{
"created": "2017-01-01T12:34:56.000Z",
"type": "relationship",
"id": "relationship--00000000-0000-0000-0000-000000000003",
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"relationship_type": "indicates",
"source_ref": "indicator--01234567-89ab-cdef-0123-456789abcdef",
"target_ref": "malware--fedcba98-7654-3210-fedc-ba9876543210",
"type": "relationship"
"target_ref": "malware--fedcba98-7654-3210-fedc-ba9876543210"
}
],
"spec_version": "2.0",
"type": "bundle"
]
}"""
@ -118,6 +118,20 @@ def test_create_bundle_with_arg_listarg_and_kwarg(indicator, malware, relationsh
assert str(bundle) == EXPECTED_BUNDLE
def test_create_bundle_invalid(indicator, malware, relationship):
with pytest.raises(ValueError) as excinfo:
stix2.Bundle(objects=[1])
assert excinfo.value.reason == "This property may only contain a dictionary or object"
with pytest.raises(ValueError) as excinfo:
stix2.Bundle(objects=[{}])
assert excinfo.value.reason == "This property may only contain a non-empty dictionary or object"
with pytest.raises(ValueError) as excinfo:
stix2.Bundle(objects=[{'type': 'bundle'}])
assert excinfo.value.reason == 'This property may not contain a Bundle object'
def test_parse_bundle():
bundle = stix2.parse(EXPECTED_BUNDLE)
@ -128,3 +142,19 @@ def test_parse_bundle():
assert bundle.objects[0].type == 'indicator'
assert bundle.objects[1].type == 'malware'
assert bundle.objects[2].type == 'relationship'
def test_parse_unknown_type():
unknown = {
"type": "other",
"id": "other--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"created": "2016-04-06T20:03:00Z",
"modified": "2016-04-06T20:03:00Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"description": "Campaign by Green Group against a series of targets in the financial services sector.",
"name": "Green Group Attacks Against Finance",
}
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse(unknown)
assert str(excinfo.value) == "Can't parse unknown object type 'other'! For custom types, use the CustomObject decorator."

View File

@ -9,13 +9,13 @@ from .constants import CAMPAIGN_ID
EXPECTED = """{
"created": "2016-04-06T20:03:00.000Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"description": "Campaign by Green Group against a series of targets in the financial services sector.",
"type": "campaign",
"id": "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T20:03:00.000Z",
"modified": "2016-04-06T20:03:00.000Z",
"name": "Green Group Attacks Against Finance",
"type": "campaign"
"description": "Campaign by Green Group against a series of targets in the financial services sector."
}"""

View File

@ -9,13 +9,13 @@ from .constants import COURSE_OF_ACTION_ID
EXPECTED = """{
"created": "2016-04-06T20:03:48.000Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"description": "This is how to add a filter rule to block inbound access to TCP port 80 to the existing UDP 1434 filter ...",
"type": "course-of-action",
"id": "course-of-action--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T20:03:48.000Z",
"modified": "2016-04-06T20:03:48.000Z",
"name": "Add TCP port 80 Filter Rule to the existing Block UDP 1434 Filter",
"type": "course-of-action"
"description": "This is how to add a filter rule to block inbound access to TCP port 80 to the existing UDP 1434 filter ..."
}"""

View File

@ -6,7 +6,7 @@ from .constants import FAKE_TIME
def test_identity_custom_property():
with pytest.raises(ValueError):
with pytest.raises(ValueError) as excinfo:
stix2.Identity(
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
created="2015-12-21T19:59:11Z",
@ -15,6 +15,7 @@ def test_identity_custom_property():
identity_class="individual",
custom_properties="foobar",
)
assert str(excinfo.value) == "'custom_properties' must be a dictionary"
identity = stix2.Identity(
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
@ -31,7 +32,7 @@ def test_identity_custom_property():
def test_identity_custom_property_invalid():
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
stix2.Identity(
id="identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
created="2015-12-21T19:59:11Z",
@ -40,6 +41,9 @@ def test_identity_custom_property_invalid():
identity_class="individual",
x_foo="bar",
)
assert excinfo.value.cls == stix2.Identity
assert excinfo.value.properties == ['x_foo']
assert "Unexpected properties for" in str(excinfo.value)
def test_identity_custom_property_allowed():
@ -67,18 +71,21 @@ def test_identity_custom_property_allowed():
}""",
])
def test_parse_identity_custom_property(data):
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
identity = stix2.parse(data)
assert excinfo.value.cls == stix2.Identity
assert excinfo.value.properties == ['foo']
assert "Unexpected properties for" in str(excinfo.value)
identity = stix2.parse(data, allow_custom=True)
assert identity.foo == "bar"
@stix2.sdo.CustomObject('x-new-type', {
'property1': stix2.properties.StringProperty(required=True),
'property2': stix2.properties.IntegerProperty(),
})
class NewType():
@stix2.sdo.CustomObject('x-new-type', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
])
class NewType(object):
def __init__(self, property2=None, **kwargs):
if property2 and property2 < 10:
raise ValueError("'property2' is too small.")
@ -88,11 +95,13 @@ def test_custom_object_type():
nt = NewType(property1='something')
assert nt.property1 == 'something'
with pytest.raises(stix2.exceptions.MissingPropertiesError):
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
NewType(property2=42)
assert "No values for required properties" in str(excinfo.value)
with pytest.raises(ValueError):
with pytest.raises(ValueError) as excinfo:
NewType(property1='something', property2=4)
assert "'property2' is too small." in str(excinfo.value)
def test_parse_custom_object_type():
@ -106,10 +115,24 @@ def test_parse_custom_object_type():
assert nt.property1 == 'something'
@stix2.observables.CustomObservable('x-new-observable', {
'property1': stix2.properties.StringProperty(required=True),
'property2': stix2.properties.IntegerProperty(),
})
def test_parse_unregistered_custom_object_type():
nt_string = """{
"type": "x-foobar-observable",
"created": "2015-12-21T19:59:11Z",
"property1": "something"
}"""
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse(nt_string)
assert "Can't parse unknown object type" in str(excinfo.value)
assert "use the CustomObject decorator." in str(excinfo.value)
@stix2.observables.CustomObservable('x-new-observable', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
('x_property3', stix2.properties.BooleanProperty()),
])
class NewObservable():
def __init__(self, property2=None, **kwargs):
if property2 and property2 < 10:
@ -120,11 +143,75 @@ def test_custom_observable_object():
no = NewObservable(property1='something')
assert no.property1 == 'something'
with pytest.raises(stix2.exceptions.MissingPropertiesError):
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
NewObservable(property2=42)
assert excinfo.value.properties == ['property1']
assert "No values for required properties" in str(excinfo.value)
with pytest.raises(ValueError):
with pytest.raises(ValueError) as excinfo:
NewObservable(property1='something', property2=4)
assert "'property2' is too small." in str(excinfo.value)
def test_custom_observable_object_invalid_ref_property():
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomObservable('x-new-obs', [
('property_ref', stix2.properties.StringProperty()),
])
class NewObs():
pass
assert "is named like an object reference property but is not an ObjectReferenceProperty" in str(excinfo.value)
def test_custom_observable_object_invalid_refs_property():
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomObservable('x-new-obs', [
('property_refs', stix2.properties.StringProperty()),
])
class NewObs():
pass
assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value)
def test_custom_observable_object_invalid_refs_list_property():
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomObservable('x-new-obs', [
('property_refs', stix2.properties.ListProperty(stix2.properties.StringProperty)),
])
class NewObs():
pass
assert "is named like an object reference list property but is not a ListProperty containing ObjectReferenceProperty" in str(excinfo.value)
def test_custom_observable_object_invalid_valid_refs():
@stix2.observables.CustomObservable('x-new-obs', [
('property1', stix2.properties.StringProperty(required=True)),
('property_ref', stix2.properties.ObjectReferenceProperty(valid_types='email-addr')),
])
class NewObs():
pass
with pytest.raises(Exception) as excinfo:
NewObs(_valid_refs=['1'],
property1='something',
property_ref='1')
assert "must be created with _valid_refs as a dict, not a list" in str(excinfo.value)
def test_custom_no_properties_raises_exception():
with pytest.raises(ValueError):
@stix2.sdo.CustomObject('x-new-object-type')
class NewObject1(object):
pass
def test_custom_wrong_properties_arg_raises_exception():
with pytest.raises(ValueError):
@stix2.observables.CustomObservable('x-new-object-type', (("prop", stix2.properties.BooleanProperty())))
class NewObject2(object):
pass
def test_parse_custom_observable_object():
@ -133,16 +220,38 @@ def test_parse_custom_observable_object():
"property1": "something"
}"""
nt = stix2.parse_observable(nt_string)
nt = stix2.parse_observable(nt_string, [])
assert nt.property1 == 'something'
def test_parse_unregistered_custom_observable_object():
nt_string = """{
"type": "x-foobar-observable",
"property1": "something"
}"""
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse_observable(nt_string)
assert "Can't parse unknown observable type" in str(excinfo.value)
def test_parse_invalid_custom_observable_object():
nt_string = """{
"property1": "something"
}"""
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse_observable(nt_string)
assert "Can't parse observable with no 'type' property" in str(excinfo.value)
def test_observable_custom_property():
with pytest.raises(ValueError):
with pytest.raises(ValueError) as excinfo:
NewObservable(
property1='something',
custom_properties="foobar",
)
assert "'custom_properties' must be a dictionary" in str(excinfo.value)
no = NewObservable(
property1='something',
@ -154,11 +263,13 @@ def test_observable_custom_property():
def test_observable_custom_property_invalid():
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
NewObservable(
property1='something',
x_foo="bar",
)
assert excinfo.value.properties == ['x_foo']
assert "Unexpected properties for" in str(excinfo.value)
def test_observable_custom_property_allowed():
@ -180,3 +291,107 @@ def test_observed_data_with_custom_observable_object():
allow_custom=True,
)
assert ob_data.objects['0'].property1 == 'something'
@stix2.observables.CustomExtension(stix2.DomainName, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
'property2': stix2.properties.IntegerProperty(),
})
class NewExtension():
def __init__(self, property2=None, **kwargs):
if property2 and property2 < 10:
raise ValueError("'property2' is too small.")
def test_custom_extension():
ext = NewExtension(property1='something')
assert ext.property1 == 'something'
with pytest.raises(stix2.exceptions.MissingPropertiesError) as excinfo:
NewExtension(property2=42)
assert excinfo.value.properties == ['property1']
assert str(excinfo.value) == "No values for required properties for _Custom: (property1)."
with pytest.raises(ValueError) as excinfo:
NewExtension(property1='something', property2=4)
assert str(excinfo.value) == "'property2' is too small."
def test_custom_extension_wrong_observable_type():
ext = NewExtension(property1='something')
with pytest.raises(ValueError) as excinfo:
stix2.File(name="abc.txt",
extensions={
"ntfs-ext": ext,
})
assert 'Cannot determine extension type' in excinfo.value.reason
def test_custom_extension_invalid_observable():
# These extensions are being applied to improperly-created Observables.
# The Observable classes should have been created with the CustomObservable decorator.
class Foo(object):
pass
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomExtension(Foo, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
})
class FooExtension():
pass # pragma: no cover
assert str(excinfo.value) == "'observable' must be a valid Observable class!"
class Bar(stix2.observables._Observable):
pass
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomExtension(Bar, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
})
class BarExtension():
pass
assert "Unknown observable type" in str(excinfo.value)
assert "Custom observables must be created with the @CustomObservable decorator." in str(excinfo.value)
class Baz(stix2.observables._Observable):
_type = 'Baz'
with pytest.raises(ValueError) as excinfo:
@stix2.observables.CustomExtension(Baz, 'x-new-ext', {
'property1': stix2.properties.StringProperty(required=True),
})
class BazExtension():
pass
assert "Unknown observable type" in str(excinfo.value)
assert "Custom observables must be created with the @CustomObservable decorator." in str(excinfo.value)
def test_parse_observable_with_custom_extension():
input_str = """{
"type": "domain-name",
"value": "example.com",
"extensions": {
"x-new-ext": {
"property1": "foo",
"property2": 12
}
}
}"""
parsed = stix2.parse_observable(input_str)
assert parsed.extensions['x-new-ext'].property2 == 12
def test_parse_observable_with_unregistered_custom_extension():
input_str = """{
"type": "domain-name",
"value": "example.com",
"extensions": {
"x-foobar-ext": {
"property1": "foo",
"property2": 12
}
}
}"""
with pytest.raises(ValueError) as excinfo:
stix2.parse_observable(input_str)
assert "Can't parse Unknown extension type" in str(excinfo.value)

View File

@ -1,51 +1,165 @@
from stix2.sources import taxii
import pytest
from taxii2client import Collection
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, make_id, taxii)
from stix2.sources.filters import Filter
from stix2.sources.memory import MemorySource
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
def test_ds_taxii():
ds = taxii.TAXIIDataSource()
assert ds.name == 'TAXII'
class MockTAXIIClient(object):
"""Mock for taxii2_client.TAXIIClient"""
pass
def test_ds_taxii_name():
ds = taxii.TAXIIDataSource(name='My Data Source Name')
@pytest.fixture
def collection():
return Collection(COLLECTION_URL, MockTAXIIClient())
@pytest.fixture
def ds():
return DataSource()
IND1 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND2 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND3 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.936Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND4 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND5 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND6 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-31T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND7 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
IND8 = {
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
STIX_OBJS2 = [IND6, IND7, IND8]
STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
def test_ds_smoke():
ds1 = DataSource()
ds2 = DataSink()
ds3 = DataStore(source=ds1, sink=ds2)
with pytest.raises(NotImplementedError):
ds3.add(None)
with pytest.raises(NotImplementedError):
ds3.all_versions("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")
with pytest.raises(NotImplementedError):
ds3.get("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")
with pytest.raises(NotImplementedError):
ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")])
def test_ds_taxii(collection):
ds = taxii.TAXIICollectionSource(collection)
assert ds.name == 'TAXIICollectionSource'
def test_ds_taxii_name(collection):
ds = taxii.TAXIICollectionSource(collection, name='My Data Source Name')
assert ds.name == "My Data Source Name"
def test_ds_params():
url = "http://taxii_url.com:5000"
creds = {"username": "Wade", "password": "Wilson"}
ds = taxii.TAXIIDataSource(api_root=url, auth=creds)
assert ds.taxii_info['api_root']['url'] == url
assert ds.taxii_info['auth'] == creds
def test_parse_taxii_filters():
query = [
{
"field": "added_after",
"op": "=",
"value": "2016-02-01T00:00:01.000Z"
},
{
"field": "id",
"op": "=",
"value": "taxii stix object ID"
},
{
"field": "type",
"op": "=",
"value": "taxii stix object ID"
},
{
"field": "version",
"op": "=",
"value": "first"
},
{
"field": "created_by_ref",
"op": "=",
"value": "Bane"
}
Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
Filter("id", "=", "taxii stix object ID"),
Filter("type", "=", "taxii stix object ID"),
Filter("version", "=", "first"),
Filter("created_by_ref", "=", "Bane"),
]
expected_params = {
@ -55,81 +169,289 @@ def test_parse_taxii_filters():
"match[version]": "first"
}
ds = taxii.TAXIIDataSource()
ds = taxii.TAXIICollectionSource(collection)
taxii_filters = ds._parse_taxii_filters(query)
assert taxii_filters == expected_params
def test_add_get_remove_filter():
class dummy(object):
x = 4
obj_1 = dummy()
def test_add_get_remove_filter(ds):
# First 3 filters are valid, remaining fields are erroneous in some way
filters = [
valid_filters = [
Filter('type', '=', 'malware'),
Filter('id', '!=', 'stix object id'),
Filter('labels', 'in', ["heartbleed", "malicious-activity"]),
]
invalid_filters = [
Filter('description', '=', 'not supported field - just place holder'),
Filter('modified', '*', 'not supported operator - just place holder'),
Filter('created', '=', object()),
]
assert len(ds.filters) == 0
ds.add_filter(valid_filters[0])
assert len(ds.filters) == 1
# Addin the same filter again will have no effect since `filters` uses a set
ds.add_filter(valid_filters[0])
assert len(ds.filters) == 1
ds.add_filter(valid_filters[1])
assert len(ds.filters) == 2
ds.add_filter(valid_filters[2])
assert len(ds.filters) == 3
# TODO: make better error messages
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[0])
assert str(excinfo.value) == "Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported"
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[1])
assert str(excinfo.value) == "Filter operation(from 'op' field) not supported"
with pytest.raises(ValueError) as excinfo:
ds.add_filter(invalid_filters[2])
assert str(excinfo.value) == "Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
assert set(valid_filters) == ds.filters
# remove
ds.filters.remove(valid_filters[0])
assert len(ds.filters) == 2
ds.add_filters(valid_filters)
def test_apply_common_filters(ds):
stix_objs = [
{
"field": "type",
"op": '=',
"value": "malware"
"created": "2017-01-27T13:49:53.997Z",
"description": "\n\nTITLE:\n\tPoison Ivy",
"id": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"labels": [
"remote-access-trojan"
],
"modified": "2017-01-27T13:49:53.997Z",
"name": "Poison Ivy",
"type": "malware"
},
{
"field": "id",
"op": "!=",
"value": "stix object id"
"created": "2014-05-08T09:00:00.000Z",
"id": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"labels": [
"file-hash-watchlist"
],
"modified": "2014-05-08T09:00:00.000Z",
"name": "File hash for Poison Ivy variant",
"pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
"type": "indicator",
"valid_from": "2014-05-08T09:00:00.000000Z"
},
{
"field": "labels",
"op": "in",
"value": ["heartbleed", "malicious-activity"]
},
{
"field": "revoked",
"value": "filter missing \'op\' field"
},
{
"field": "granular_markings",
"op": "=",
"value": "not supported field - just place holder"
},
{
"field": "modified",
"op": "*",
"value": "not supported operator - just place holder"
},
{
"field": "created",
"op": "=",
"value": obj_1
"created": "2014-05-08T09:00:00.000Z",
"granular_markings": [
{
"marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
"selectors": [
"relationship_type"
]
}
],
"id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463",
"modified": "2014-05-08T09:00:00.000Z",
"object_marking_refs": [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
],
"relationship_type": "indicates",
"revoked": True,
"source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"type": "relationship"
}
]
expected_errors = [
"Filter was missing a required field(key). Each filter requires 'field', 'op', 'value' keys.",
"Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported",
"Filter operation(from 'op' field) not supported",
"Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary"
filters = [
Filter("type", "!=", "relationship"),
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
Filter("labels", "in", "remote-access-trojan"),
Filter("created", ">", "2015-01-01T01:00:00.000Z"),
Filter("revoked", "=", True),
Filter("revoked", "!=", True),
Filter("revoked", "?", False),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "relationship_type"),
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
]
ds = taxii.TAXIIDataSource()
# add
ids, statuses = ds.add_filter(filters)
resp = ds.apply_common_filters(stix_objs, [filters[0]])
ids = [r['id'] for r in resp]
assert stix_objs[0]['id'] in ids
assert stix_objs[1]['id'] in ids
# 7 filters should have been successfully added
assert len(ids) == 7
resp = ds.apply_common_filters(stix_objs, [filters[1]])
assert resp[0]['id'] == stix_objs[2]['id']
# all filters added to data source
for idx, status in enumerate(statuses):
assert status['filter'] == filters[idx]
resp = ds.apply_common_filters(stix_objs, [filters[2]])
assert resp[0]['id'] == stix_objs[0]['id']
# proper status warnings were triggered
assert statuses[3]['errors'][0] == expected_errors[0]
assert statuses[4]['errors'][0] == expected_errors[1]
assert statuses[5]['errors'][0] == expected_errors[2]
assert statuses[6]['errors'][0] == expected_errors[3]
resp = ds.apply_common_filters(stix_objs, [filters[3]])
assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 1
resp = ds.apply_common_filters(stix_objs, [filters[4]])
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# Note that if 'revoked' property is not present in object.
# Currently we can't use such an expression to filter for...
resp = ds.apply_common_filters(stix_objs, [filters[5]])
assert len(resp) == 0
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(stix_objs, [filters[6]])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(filters[6].op,
filters[6].field)
resp = ds.apply_common_filters(stix_objs, [filters[7]])
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
resp = ds.apply_common_filters(stix_objs, [filters[8], filters[9]])
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
def test_filters0(ds):
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters1(ds):
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters2(ds):
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
def test_filters3(ds):
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
def test_filters4(ds):
fltr4 = Filter("modified", "?", "2017-01-27T13:49:53.935Z")
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [fltr4])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(fltr4.op, fltr4.field)
def test_filters5(ds):
resp = ds.apply_common_filters(STIX_OBJS2, [Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
def test_filters6(ds):
fltr6 = Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [fltr6])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(fltr6.op, fltr6.field)
def test_filters7(ds):
fltr7 = Filter("notacommonproperty", "=", "bar")
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [fltr7])
assert str(excinfo.value) == ("Error, field: {0} is not supported for "
"filtering on.".format(fltr7.field))
def test_deduplicate(ds):
unique = ds.deduplicate(STIX_OBJS1)
# Only 3 objects are unique
# 2 id's vary
# 2 modified times vary for a particular id
assert len(unique) == 3
ids = [obj['id'] for obj in unique]
mods = [obj['modified'] for obj in unique]
assert "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f" in ids
assert "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f" in ids
assert "2017-01-27T13:49:53.935Z" in mods
assert "2017-01-27T13:49:53.936Z" in mods
def test_add_remove_composite_datasource():
cds = CompositeDataSource()
ds1 = DataSource()
ds2 = DataSource()
ds3 = DataSink()
cds.add_data_source([ds1, ds2, ds1, ds3])
assert len(cds.get_all_data_sources()) == 2
cds.remove_data_source([ds1.id_, ds2.id_])
assert len(cds.get_all_data_sources()) == 0
with pytest.raises(ValueError):
cds.remove_data_source([ds3.id_])
def test_composite_datasource_operations():
BUNDLE1 = dict(id="bundle--%s" % make_id(),
objects=STIX_OBJS1,
spec_version="2.0",
type="bundle")
cds = CompositeDataSource()
ds1 = MemorySource(stix_data=BUNDLE1)
ds2 = MemorySource(stix_data=STIX_OBJS2)
cds.add_data_source([ds1, ds2])
indicators = cds.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
# In STIX_OBJS2 changed the 'modified' property to a later time...
assert len(indicators) == 2
indicator = cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
assert indicator["type"] == "indicator"
query = [
Filter("type", "=", "indicator")
]
results = cds.query(query)
# STIX_OBJS2 has indicator with later time, one with different id, one with
# original time in STIX_OBJS1
assert len(results) == 3
# def test_data_source_file():
# ds = file.FileDataSource()

View File

@ -8,9 +8,12 @@ import stix2
VERIS = """{
"external_id": "0001AA7F-C601-424A-B2B8-BE6C9F5164E7",
"source_name": "veris",
"url": "https://github.com/vz-risk/VCDB/blob/master/data/json/0001AA7F-C601-424A-B2B8-BE6C9F5164E7.json"
"url": "https://github.com/vz-risk/VCDB/blob/master/data/json/0001AA7F-C601-424A-B2B8-BE6C9F5164E7.json",
"hashes": {
"SHA-256": "6db12788c37247f2316052e142f42f4b259d6561751e5f401a1ae2a6df9c674b"
},
"external_id": "0001AA7F-C601-424A-B2B8-BE6C9F5164E7"
}"""
@ -18,6 +21,9 @@ def test_external_reference_veris():
ref = stix2.ExternalReference(
source_name="veris",
external_id="0001AA7F-C601-424A-B2B8-BE6C9F5164E7",
hashes={
"SHA-256": "6db12788c37247f2316052e142f42f4b259d6561751e5f401a1ae2a6df9c674b"
},
url="https://github.com/vz-risk/VCDB/blob/master/data/json/0001AA7F-C601-424A-B2B8-BE6C9F5164E7.json",
)
@ -25,8 +31,8 @@ def test_external_reference_veris():
CAPEC = """{
"external_id": "CAPEC-550",
"source_name": "capec"
"source_name": "capec",
"external_id": "CAPEC-550"
}"""
@ -37,13 +43,13 @@ def test_external_reference_capec():
)
assert str(ref) == CAPEC
assert re.match("ExternalReference\(external_id=u?'CAPEC-550', source_name=u?'capec'\)", repr(ref))
assert re.match("ExternalReference\(source_name=u?'capec', external_id=u?'CAPEC-550'\)", repr(ref))
CAPEC_URL = """{
"external_id": "CAPEC-550",
"source_name": "capec",
"url": "http://capec.mitre.org/data/definitions/550.html"
"url": "http://capec.mitre.org/data/definitions/550.html",
"external_id": "CAPEC-550"
}"""
@ -58,8 +64,8 @@ def test_external_reference_capec_url():
THREAT_REPORT = """{
"description": "Threat report",
"source_name": "ACME Threat Intel",
"description": "Threat report",
"url": "http://www.example.com/threat-report.pdf"
}"""
@ -75,9 +81,9 @@ def test_external_reference_threat_report():
BUGZILLA = """{
"external_id": "1370",
"source_name": "ACME Bugzilla",
"url": "https://www.example.com/bugs/1370"
"url": "https://www.example.com/bugs/1370",
"external_id": "1370"
}"""
@ -92,8 +98,8 @@ def test_external_reference_bugzilla():
OFFLINE = """{
"description": "Threat report",
"source_name": "ACME Threat Intel"
"source_name": "ACME Threat Intel",
"description": "Threat report"
}"""
@ -104,7 +110,7 @@ def test_external_reference_offline():
)
assert str(ref) == OFFLINE
assert re.match("ExternalReference\(description=u?'Threat report', source_name=u?'ACME Threat Intel'\)", repr(ref))
assert re.match("ExternalReference\(source_name=u?'ACME Threat Intel', description=u?'Threat report'\)", repr(ref))
# Yikes! This works
assert eval("stix2." + repr(ref)) == ref

View File

@ -9,12 +9,12 @@ from .constants import IDENTITY_ID
EXPECTED = """{
"created": "2015-12-21T19:59:11.000Z",
"type": "identity",
"id": "identity--311b2d2d-f010-5473-83ec-1edf84858f4c",
"identity_class": "individual",
"created": "2015-12-21T19:59:11.000Z",
"modified": "2015-12-21T19:59:11.000Z",
"name": "John Smith",
"type": "identity"
"identity_class": "individual"
}"""

View File

@ -10,25 +10,25 @@ from .constants import FAKE_TIME, INDICATOR_ID, INDICATOR_KWARGS
EXPECTED_INDICATOR = """{
"created": "2017-01-01T00:00:01.000Z",
"type": "indicator",
"id": "indicator--01234567-89ab-cdef-0123-456789abcdef",
"created": "2017-01-01T00:00:01.000Z",
"modified": "2017-01-01T00:00:01.000Z",
"labels": [
"malicious-activity"
],
"modified": "2017-01-01T00:00:01.000Z",
"pattern": "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
"type": "indicator",
"valid_from": "1970-01-01T00:00:01Z"
}"""
EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join("""
created=STIXdatetime(2017, 1, 1, 0, 0, 1, tzinfo=<UTC>),
id='indicator--01234567-89ab-cdef-0123-456789abcdef',
labels=['malicious-activity'],
modified=STIXdatetime(2017, 1, 1, 0, 0, 1, tzinfo=<UTC>),
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
type='indicator',
valid_from=datetime.datetime(1970, 1, 1, 0, 0, 1, tzinfo=<UTC>)
id='indicator--01234567-89ab-cdef-0123-456789abcdef',
created='2017-01-01T00:00:01.000Z',
modified='2017-01-01T00:00:01.000Z',
labels=['malicious-activity'],
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
valid_from='1970-01-01T00:00:01Z'
""".split()) + ")"
@ -174,3 +174,23 @@ def test_parse_indicator(data):
assert idctr.valid_from == dt.datetime(1970, 1, 1, 0, 0, 1, tzinfo=pytz.utc)
assert idctr.labels[0] == "malicious-activity"
assert idctr.pattern == "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']"
def test_invalid_indicator_pattern():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.Indicator(
labels=['malicious-activity'],
pattern="file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e'",
)
assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.prop_name == 'pattern'
assert 'input is missing square brackets' in excinfo.value.reason
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.Indicator(
labels=['malicious-activity'],
pattern='[file:hashes.MD5 = "d41d8cd98f00b204e9800998ecf8427e"]',
)
assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.prop_name == 'pattern'
assert 'mismatched input' in excinfo.value.reason

View File

@ -9,21 +9,21 @@ from .constants import INTRUSION_SET_ID
EXPECTED = """{
"type": "intrusion-set",
"id": "intrusion-set--4e78f46f-a023-4e5f-bc24-71b3ca22ec29",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T20:03:48.000Z",
"modified": "2016-04-06T20:03:48.000Z",
"name": "Bobcat Breakin",
"description": "Incidents usually feature a shared TTP of a bobcat being released...",
"aliases": [
"Zookeeper"
],
"created": "2016-04-06T20:03:48.000Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"description": "Incidents usually feature a shared TTP of a bobcat being released...",
"goals": [
"acquisition-theft",
"harassment",
"damage"
],
"id": "intrusion-set--4e78f46f-a023-4e5f-bc24-71b3ca22ec29",
"modified": "2016-04-06T20:03:48.000Z",
"name": "Bobcat Breakin",
"type": "intrusion-set"
]
}"""

View File

@ -10,14 +10,14 @@ from .constants import FAKE_TIME, MALWARE_ID, MALWARE_KWARGS
EXPECTED_MALWARE = """{
"created": "2016-05-12T08:17:27.000Z",
"type": "malware",
"id": "malware--fedcba98-7654-3210-fedc-ba9876543210",
"labels": [
"ransomware"
],
"created": "2016-05-12T08:17:27.000Z",
"modified": "2016-05-12T08:17:27.000Z",
"name": "Cryptolocker",
"type": "malware"
"labels": [
"ransomware"
]
}"""

View File

@ -10,36 +10,36 @@ from .constants import MARKING_DEFINITION_ID
EXPECTED_TLP_MARKING_DEFINITION = """{
"type": "marking-definition",
"id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
"created": "2017-01-20T00:00:00Z",
"definition_type": "tlp",
"definition": {
"tlp": "white"
},
"definition_type": "tlp",
"id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
"type": "marking-definition"
}
}"""
EXPECTED_STATEMENT_MARKING_DEFINITION = """{
"type": "marking-definition",
"id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
"created": "2017-01-20T00:00:00Z",
"definition_type": "statement",
"definition": {
"statement": "Copyright 2016, Example Corp"
},
"definition_type": "statement",
"id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
"type": "marking-definition"
}
}"""
EXPECTED_CAMPAIGN_WITH_OBJECT_MARKING = """{
"created": "2016-04-06T20:03:00.000Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"description": "Campaign by Green Group against a series of targets in the financial services sector.",
"type": "campaign",
"id": "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T20:03:00.000Z",
"modified": "2016-04-06T20:03:00.000Z",
"name": "Green Group Attacks Against Finance",
"description": "Campaign by Green Group against a series of targets in the financial services sector.",
"object_marking_refs": [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
],
"type": "campaign"
]
}"""
EXPECTED_GRANULAR_MARKING = """{
@ -53,8 +53,12 @@ EXPECTED_GRANULAR_MARKING = """{
}"""
EXPECTED_CAMPAIGN_WITH_GRANULAR_MARKINGS = """{
"created": "2016-04-06T20:03:00.000Z",
"type": "campaign",
"id": "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T20:03:00.000Z",
"modified": "2016-04-06T20:03:00.000Z",
"name": "Green Group Attacks Against Finance",
"description": "Campaign by Green Group against a series of targets in the financial services sector.",
"granular_markings": [
{
@ -63,11 +67,7 @@ EXPECTED_CAMPAIGN_WITH_GRANULAR_MARKINGS = """{
"description"
]
}
],
"id": "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"modified": "2016-04-06T20:03:00.000Z",
"name": "Green Group Attacks Against Finance",
"type": "campaign"
]
}"""
@ -75,7 +75,7 @@ def test_marking_def_example_with_tlp():
assert str(TLP_WHITE) == EXPECTED_TLP_MARKING_DEFINITION
def test_marking_def_example_with_statement():
def test_marking_def_example_with_statement_positional_argument():
marking_definition = stix2.MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
@ -86,12 +86,13 @@ def test_marking_def_example_with_statement():
assert str(marking_definition) == EXPECTED_STATEMENT_MARKING_DEFINITION
def test_marking_def_example_with_positional_statement():
def test_marking_def_example_with_kwargs_statement():
kwargs = dict(statement="Copyright 2016, Example Corp")
marking_definition = stix2.MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
definition_type="statement",
definition=stix2.StatementMarking("Copyright 2016, Example Corp")
definition=stix2.StatementMarking(**kwargs)
)
assert str(marking_definition) == EXPECTED_STATEMENT_MARKING_DEFINITION
@ -182,4 +183,64 @@ def test_parse_marking_definition(data):
assert gm.definition_type == "tlp"
@stix2.common.CustomMarking('x-new-marking-type', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
])
class NewMarking(object):
def __init__(self, property2=None, **kwargs):
return
def test_registered_custom_marking():
nm = NewMarking(property1='something', property2=55)
marking_def = stix2.MarkingDefinition(
id="marking-definition--00000000-0000-0000-0000-000000000012",
created="2017-01-22T00:00:00.000Z",
definition_type="x-new-marking-type",
definition=nm
)
assert marking_def.type == "marking-definition"
assert marking_def.id == "marking-definition--00000000-0000-0000-0000-000000000012"
assert marking_def.created == dt.datetime(2017, 1, 22, 0, 0, 0, tzinfo=pytz.utc)
assert marking_def.definition.property1 == "something"
assert marking_def.definition.property2 == 55
assert marking_def.definition_type == "x-new-marking-type"
def test_not_registered_marking_raises_exception():
with pytest.raises(ValueError) as excinfo:
# Used custom object on purpose to demonstrate a not-registered marking
@stix2.sdo.CustomObject('x-new-marking-type2', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
])
class NewObject2(object):
def __init__(self, property2=None, **kwargs):
return
no = NewObject2(property1='something', property2=55)
stix2.MarkingDefinition(
id="marking-definition--00000000-0000-0000-0000-000000000012",
created="2017-01-22T00:00:00.000Z",
definition_type="x-new-marking-type2",
definition=no
)
assert str(excinfo.value) == "definition_type must be a valid marking type"
def test_marking_wrong_type_construction():
with pytest.raises(ValueError) as excinfo:
# Test passing wrong type for properties.
@stix2.CustomMarking('x-new-marking-type2', ("a", "b"))
class NewObject3(object):
pass
assert str(excinfo.value) == "Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]"
# TODO: Add other examples

View File

@ -8,22 +8,24 @@ import stix2
from .constants import OBSERVED_DATA_ID
OBJECTS_REGEX = re.compile('\"objects\": {(?:.*?)(?:(?:[^{]*?)|(?:{[^{]*?}))*}', re.DOTALL)
EXPECTED = """{
"created": "2016-04-06T19:58:16.000Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"first_observed": "2015-12-21T19:00:00Z",
"type": "observed-data",
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
"last_observed": "2015-12-21T19:00:00Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T19:58:16.000Z",
"modified": "2016-04-06T19:58:16.000Z",
"first_observed": "2015-12-21T19:00:00Z",
"last_observed": "2015-12-21T19:00:00Z",
"number_observed": 50,
"objects": {
"0": {
"name": "foo.exe",
"type": "file"
"type": "file",
"name": "foo.exe"
}
},
"type": "observed-data"
}
}"""
@ -48,27 +50,27 @@ def test_observed_data_example():
EXPECTED_WITH_REF = """{
"created": "2016-04-06T19:58:16.000Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"first_observed": "2015-12-21T19:00:00Z",
"type": "observed-data",
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
"last_observed": "2015-12-21T19:00:00Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T19:58:16.000Z",
"modified": "2016-04-06T19:58:16.000Z",
"first_observed": "2015-12-21T19:00:00Z",
"last_observed": "2015-12-21T19:00:00Z",
"number_observed": 50,
"objects": {
"0": {
"name": "foo.exe",
"type": "file"
"type": "file",
"name": "foo.exe"
},
"1": {
"type": "directory",
"path": "/usr/home",
"contains_refs": [
"0"
],
"path": "/usr/home",
"type": "directory"
]
}
},
"type": "observed-data"
}
}"""
@ -125,6 +127,42 @@ def test_observed_data_example_with_bad_refs():
assert excinfo.value.reason == "Invalid object reference for 'Directory:contains_refs': '2' is not a valid object in local scope"
def test_observed_data_example_with_non_dictionary():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.ObservedData(
id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T19:58:16.000Z",
modified="2016-04-06T19:58:16.000Z",
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=50,
objects="file: foo.exe",
)
assert excinfo.value.cls == stix2.ObservedData
assert excinfo.value.prop_name == "objects"
assert 'must contain a dictionary' in excinfo.value.reason
def test_observed_data_example_with_empty_dictionary():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.ObservedData(
id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T19:58:16.000Z",
modified="2016-04-06T19:58:16.000Z",
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=50,
objects={},
)
assert excinfo.value.cls == stix2.ObservedData
assert excinfo.value.prop_name == "objects"
assert 'must contain a non-empty dictionary' in excinfo.value.reason
@pytest.mark.parametrize("data", [
EXPECTED,
{
@ -173,7 +211,7 @@ def test_parse_observed_data(data):
}""",
])
def test_parse_artifact_valid(data):
odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED)
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
odata = stix2.parse(odata_str)
assert odata.objects["0"].type == "artifact"
@ -194,7 +232,7 @@ def test_parse_artifact_valid(data):
}""",
])
def test_parse_artifact_invalid(data):
odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED)
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
with pytest.raises(ValueError):
stix2.parse(odata_str)
@ -204,6 +242,7 @@ def test_artifact_example_dependency_error():
stix2.Artifact(url="http://example.com/sirvizio.exe")
assert excinfo.value.dependencies == [("hashes", "url")]
assert str(excinfo.value) == "The property dependencies for Artifact: (hashes) are not met."
@pytest.mark.parametrize("data", [
@ -215,7 +254,7 @@ def test_artifact_example_dependency_error():
}""",
])
def test_parse_autonomous_system_valid(data):
odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED)
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
odata = stix2.parse(odata_str)
assert odata.objects["0"].type == "autonomous-system"
assert odata.objects["0"].number == 15139
@ -358,7 +397,7 @@ def test_parse_email_message_not_multipart(data):
}""",
])
def test_parse_file_archive(data):
odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED)
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
odata = stix2.parse(odata_str)
assert odata.objects["3"].extensions['archive-ext'].version == "5.0"
@ -416,6 +455,8 @@ def test_parse_email_message_with_at_least_one_error(data):
assert excinfo.value.cls == stix2.EmailMIMEComponent
assert excinfo.value.properties == ["body", "body_raw_ref"]
assert "At least one of the" in str(excinfo.value)
assert "must be populated" in str(excinfo.value)
@pytest.mark.parametrize("data", [
@ -555,6 +596,7 @@ def test_artifact_mutual_exclusion_error():
assert excinfo.value.cls == stix2.Artifact
assert excinfo.value.properties == ["payload_bin", "url"]
assert 'are mutually exclusive' in str(excinfo.value)
def test_directory_example():
@ -800,6 +842,8 @@ def test_file_example_encryption_error():
assert excinfo.value.cls == stix2.File
assert excinfo.value.dependencies == [("is_encrypted", "encryption_algorithm")]
assert "property dependencies" in str(excinfo.value)
assert "are not met" in str(excinfo.value)
with pytest.raises(stix2.exceptions.DependentPropertiesError) as excinfo:
stix2.File(name="qwerty.dll",
@ -925,6 +969,10 @@ def test_process_example_empty_error():
properties_of_process = list(stix2.Process._properties.keys())
properties_of_process.remove("type")
assert excinfo.value.properties == sorted(properties_of_process)
msg = "At least one of the ({1}) properties for {0} must be populated."
msg = msg.format(stix2.Process.__name__,
", ".join(sorted(properties_of_process)))
assert str(excinfo.value) == msg
def test_process_example_empty_with_extensions():

View File

@ -5,10 +5,10 @@ from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError
from stix2.observables import EmailMIMEComponent, ExtensionsProperty
from stix2.properties import (BinaryProperty, BooleanProperty,
DictionaryProperty, EmbeddedObjectProperty,
EnumProperty, HashesProperty, HexProperty,
IDProperty, IntegerProperty, ListProperty,
Property, ReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
EnumProperty, FloatProperty, HashesProperty,
HexProperty, IDProperty, IntegerProperty,
ListProperty, Property, ReferenceProperty,
StringProperty, TimestampProperty, TypeProperty)
from .constants import FAKE_TIME
@ -119,6 +119,27 @@ def test_integer_property_invalid(value):
int_prop.clean(value)
@pytest.mark.parametrize("value", [
2,
-1,
3.14,
False,
])
def test_float_property_valid(value):
int_prop = FloatProperty()
assert int_prop.clean(value) is not None
@pytest.mark.parametrize("value", [
"something",
StringProperty(),
])
def test_float_property_invalid(value):
int_prop = FloatProperty()
with pytest.raises(ValueError):
int_prop.clean(value)
@pytest.mark.parametrize("value", [
True,
False,
@ -206,15 +227,42 @@ def test_dictionary_property_valid(d):
@pytest.mark.parametrize("d", [
{'a': 'something'},
{'a'*300: 'something'},
{'Hey!': 'something'},
[{'a': 'something'}, "Invalid dictionary key a: (shorter than 3 characters)."],
[{'a'*300: 'something'}, "Invalid dictionary key aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaaaaaaaaaaaaaaaaaaaaa: (longer than 256 characters)."],
[{'Hey!': 'something'}, "Invalid dictionary key Hey!: (contains characters other thanlowercase a-z, "
"uppercase A-Z, numerals 0-9, hyphen (-), or underscore (_))."],
])
def test_dictionary_property_invalid_key(d):
dict_prop = DictionaryProperty()
with pytest.raises(DictionaryKeyError) as excinfo:
dict_prop.clean(d[0])
assert str(excinfo.value) == d[1]
@pytest.mark.parametrize("d", [
({}, "The dictionary property must contain a non-empty dictionary"),
# TODO: This error message could be made more helpful. The error is caused
# because `json.loads()` doesn't like the *single* quotes around the key
# name, even though they are valid in a Python dictionary. While technically
# accurate (a string is not a dictionary), if we want to be able to load
# string-encoded "dictionaries" that are, we need a better error message
# or an alternative to `json.loads()` ... and preferably *not* `eval()`. :-)
# Changing the following to `'{"description": "something"}'` does not cause
# any ValueError to be raised.
("{'description': 'something'}", "The dictionary property must contain a dictionary"),
])
def test_dictionary_property_invalid(d):
dict_prop = DictionaryProperty()
with pytest.raises(DictionaryKeyError):
dict_prop.clean(d)
with pytest.raises(ValueError) as excinfo:
dict_prop.clean(d[0])
assert str(excinfo.value) == d[1]
@pytest.mark.parametrize("value", [
@ -250,10 +298,18 @@ def test_embedded_property():
emb_prop.clean("string")
def test_enum_property():
enum_prop = EnumProperty(['a', 'b', 'c'])
@pytest.mark.parametrize("value", [
['a', 'b', 'c'],
('a', 'b', 'c'),
'b',
])
def test_enum_property_valid(value):
enum_prop = EnumProperty(value)
assert enum_prop.clean('b')
def test_enum_property_invalid():
enum_prop = EnumProperty(['a', 'b', 'c'])
with pytest.raises(ValueError):
enum_prop.clean('z')

View File

@ -10,13 +10,13 @@ from .constants import (FAKE_TIME, INDICATOR_ID, MALWARE_ID, RELATIONSHIP_ID,
EXPECTED_RELATIONSHIP = """{
"created": "2016-04-06T20:06:37.000Z",
"type": "relationship",
"id": "relationship--00000000-1111-2222-3333-444444444444",
"created": "2016-04-06T20:06:37.000Z",
"modified": "2016-04-06T20:06:37.000Z",
"relationship_type": "indicates",
"source_ref": "indicator--01234567-89ab-cdef-0123-456789abcdef",
"target_ref": "malware--fedcba98-7654-3210-fedc-ba9876543210",
"type": "relationship"
"target_ref": "malware--fedcba98-7654-3210-fedc-ba9876543210"
}"""

View File

@ -9,22 +9,22 @@ from .constants import INDICATOR_KWARGS, REPORT_ID
EXPECTED = """{
"created": "2015-12-21T19:59:11.000Z",
"created_by_ref": "identity--a463ffb3-1bd9-4d94-b02d-74e4f1658283",
"description": "A simple report with an indicator and campaign",
"type": "report",
"id": "report--84e4d88f-44ea-4bcd-bbf3-b2c1c320bcb3",
"labels": [
"campaign"
],
"created_by_ref": "identity--a463ffb3-1bd9-4d94-b02d-74e4f1658283",
"created": "2015-12-21T19:59:11.000Z",
"modified": "2015-12-21T19:59:11.000Z",
"name": "The Black Vine Cyberespionage Group",
"description": "A simple report with an indicator and campaign",
"published": "2016-01-20T17:00:00Z",
"object_refs": [
"indicator--26ffb872-1dd9-446e-b6f5-d58527e5b5d2",
"campaign--83422c77-904c-4dc1-aff5-5c38f3a2c55c",
"relationship--f82356ae-fe6c-437c-9c24-6b64314ae68a"
],
"published": "2016-01-20T17:00:00Z",
"type": "report"
"labels": [
"campaign"
]
}"""

View File

@ -9,11 +9,11 @@ from .constants import INDICATOR_ID, SIGHTING_ID, SIGHTING_KWARGS
EXPECTED_SIGHTING = """{
"created": "2016-04-06T20:06:37.000Z",
"type": "sighting",
"id": "sighting--bfbc19db-ec35-4e45-beed-f8bde2a772fb",
"created": "2016-04-06T20:06:37.000Z",
"modified": "2016-04-06T20:06:37.000Z",
"sighting_of_ref": "indicator--01234567-89ab-cdef-0123-456789abcdef",
"type": "sighting",
"where_sighted_refs": [
"identity--8cc7afd6-5455-4d2b-a736-e614ee631d99"
]

View File

@ -9,16 +9,16 @@ from .constants import THREAT_ACTOR_ID
EXPECTED = """{
"created": "2016-04-06T20:03:48.000Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"description": "The Evil Org threat actor group",
"type": "threat-actor",
"id": "threat-actor--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"labels": [
"crime-syndicate"
],
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T20:03:48.000Z",
"modified": "2016-04-06T20:03:48.000Z",
"name": "Evil Org",
"type": "threat-actor"
"description": "The Evil Org threat actor group",
"labels": [
"crime-syndicate"
]
}"""

View File

@ -9,15 +9,15 @@ from .constants import TOOL_ID
EXPECTED = """{
"created": "2016-04-06T20:03:48.000Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"type": "tool",
"id": "tool--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"labels": [
"remote-access"
],
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"created": "2016-04-06T20:03:48.000Z",
"modified": "2016-04-06T20:03:48.000Z",
"name": "VNC",
"type": "tool"
"labels": [
"remote-access"
]
}"""

View File

@ -90,6 +90,11 @@ def test_versioning_error_bad_modified_value():
assert excinfo.value.prop_name == "modified"
assert excinfo.value.reason == "The new modified datetime cannot be before the current modified datatime."
msg = "Invalid value for {0} '{1}': {2}"
msg = msg.format(stix2.Campaign.__name__, "modified",
"The new modified datetime cannot be before the current modified datatime.")
assert str(excinfo.value) == msg
def test_versioning_error_usetting_required_property():
campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS)
@ -100,6 +105,10 @@ def test_versioning_error_usetting_required_property():
assert excinfo.value.cls == stix2.Campaign
assert excinfo.value.properties == ["name"]
msg = "No values for required properties for {0}: ({1})."
msg = msg.format(stix2.Campaign.__name__, "name")
assert str(excinfo.value) == msg
def test_versioning_error_new_version_of_revoked():
campaign_v1 = stix2.Campaign(**CAMPAIGN_MORE_KWARGS)
@ -107,8 +116,10 @@ def test_versioning_error_new_version_of_revoked():
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
campaign_v2.new_version(name="barney")
assert str(excinfo.value) == "Cannot create a new version of a revoked object."
assert excinfo.value.called_by == "new_version"
assert str(excinfo.value) == "Cannot create a new version of a revoked object."
def test_versioning_error_revoke_of_revoked():
@ -117,8 +128,10 @@ def test_versioning_error_revoke_of_revoked():
with pytest.raises(stix2.exceptions.RevokeError) as excinfo:
campaign_v2.revoke()
assert str(excinfo.value) == "Cannot revoke an already revoked object."
assert excinfo.value.called_by == "revoke"
assert str(excinfo.value) == "Cannot revoke an already revoked object."
def test_making_new_version_dict():

View File

@ -9,17 +9,17 @@ from .constants import VULNERABILITY_ID
EXPECTED = """{
"created": "2016-05-12T08:17:27.000Z",
"external_references": [
{
"external_id": "CVE-2016-1234",
"source_name": "cve"
}
],
"type": "vulnerability",
"id": "vulnerability--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
"created": "2016-05-12T08:17:27.000Z",
"modified": "2016-05-12T08:17:27.000Z",
"name": "CVE-2016-1234",
"type": "vulnerability"
"external_references": [
{
"source_name": "cve",
"external_id": "CVE-2016-1234"
}
]
}"""

View File

@ -29,6 +29,9 @@ class STIXdatetime(dt.datetime):
self.precision = precision
return self
def __repr__(self):
return "'%s'" % format_datetime(self)
def get_timestamp():
return STIXdatetime.now(tz=pytz.UTC)
@ -82,7 +85,7 @@ def parse_into_datetime(value, precision=None):
# Ensure correct precision
if not precision:
return ts
return STIXdatetime(ts, precision=precision)
ms = ts.microsecond
if precision == 'second':
ts = ts.replace(microsecond=0)
@ -119,6 +122,41 @@ def get_dict(data):
raise ValueError("Cannot convert '%s' to dictionary." % str(data))
def find_property_index(obj, properties, tuple_to_find):
"""Recursively find the property in the object model, return the index
according to the _properties OrderedDict. If its a list look for
individual objects.
"""
from .base import _STIXBase
try:
if tuple_to_find[1] in obj._inner.values():
return properties.index(tuple_to_find[0])
raise ValueError
except ValueError:
for pv in obj._inner.values():
if isinstance(pv, list):
for item in pv:
if isinstance(item, _STIXBase):
val = find_property_index(item,
item.object_properties(),
tuple_to_find)
if val is not None:
return val
elif isinstance(pv, dict):
if pv.get(tuple_to_find[0]) is not None:
try:
return int(tuple_to_find[0])
except ValueError:
return len(tuple_to_find[0])
for item in pv.values():
if isinstance(item, _STIXBase):
val = find_property_index(item,
item.object_properties(),
tuple_to_find)
if val is not None:
return val
def new_version(data, **kwargs):
"""Create a new version of a STIX object, by modifying properties and
updating the `modified` property.