Merge remote-tracking branch 'origin/datastores' into filter

stix2.1
Greg Back 2017-08-31 17:18:20 +00:00
commit 71d42b0e51
22 changed files with 583 additions and 195 deletions

View File

@ -1,7 +1,7 @@
[settings]
check=1
diff=1
known_third_party=dateutil,pytest,pytz,six,requests,taxii2_client
known_third_party=ordereddict,dateutil,pytest,pytz,requests,simplejson,six,stix2patterns,stix2validator,taxii2client
known_first_party=stix2
not_skip=__init__.py
force_sort_within_sections=1

View File

@ -39,8 +39,8 @@ constructor:
from stix2 import Indicator
indicator = Indicator(name="File hash for malware variant",
labels=['malicious-activity'],
pattern='file:hashes.md5 = "d41d8cd98f00b204e9800998ecf8427e"')
labels=["malicious-activity"],
pattern="[file:hashes.md5 = 'd41d8cd98f00b204e9800998ecf8427e']")
Certain required attributes of all objects will be set automatically if
not provided as keyword arguments:

View File

@ -47,11 +47,14 @@ setup(
keywords="stix stix2 json cti cyber threat intelligence",
packages=find_packages(),
install_requires=[
'ordereddict',
'ordereddict ; python_version<"2.7"',
'python-dateutil',
'pytz',
'requests',
'simplejson',
'six'
'six',
'stix2-patterns',
'stix2-validator',
'taxii2-client',
],
)

View File

@ -3,7 +3,7 @@
# flake8: noqa
from . import exceptions
from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE,
from .common import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, CustomMarking,
ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking)
from .core import Bundle, _register_type, parse

View File

@ -38,7 +38,7 @@ def get_required_properties(properties):
class _STIXBase(collections.Mapping):
"""Base class for STIX object types"""
def _object_properties(self):
def object_properties(self):
return list(self._properties.keys())
def _check_property(self, prop_name, prop, kwargs):
@ -146,7 +146,7 @@ class _STIXBase(collections.Mapping):
super(_STIXBase, self).__setattr__(name, value)
def __str__(self):
properties = self._object_properties()
properties = self.object_properties()
def sort_by(element):
return find_property_index(self, properties, element)
@ -157,7 +157,7 @@ class _STIXBase(collections.Mapping):
separators=(",", ": "))
def __repr__(self):
props = [(k, self[k]) for k in self._object_properties() if self.get(k)]
props = [(k, self[k]) for k in self.object_properties() if self.get(k)]
return "{0}({1})".format(self.__class__.__name__,
", ".join(["{0!s}={1!r}".format(k, v) for k, v in props]))

View File

@ -6,10 +6,9 @@ except ImportError:
from ordereddict import OrderedDict
from .base import _STIXBase
from .properties import (BooleanProperty, HashesProperty, IDProperty,
ListProperty, Property, ReferenceProperty,
SelectorProperty, StringProperty, TimestampProperty,
TypeProperty)
from .properties import (HashesProperty, IDProperty, ListProperty, Property,
ReferenceProperty, SelectorProperty, StringProperty,
TimestampProperty, TypeProperty)
from .utils import NOW, get_dict
@ -110,17 +109,54 @@ class MarkingDefinition(_STIXBase):
super(MarkingDefinition, self).__init__(**kwargs)
def register_marking(new_marking):
"""Register a custom STIX Marking Definition type.
"""
OBJ_MAP_MARKING[new_marking._type] = new_marking
OBJ_MAP_MARKING = {
'tlp': TLPMarking,
'statement': StatementMarking,
}
def _register_marking(cls):
"""Register a custom STIX Marking Definition type.
"""
OBJ_MAP_MARKING[cls._type] = cls
return cls
def CustomMarking(type='x-custom-marking', properties=None):
"""
Custom STIX Marking decorator.
Examples:
@CustomMarking('x-custom-marking', [
('property1', StringProperty(required=True)),
('property2', IntegerProperty()),
])
class MyNewMarkingObjectType():
pass
"""
def custom_builder(cls):
class _Custom(cls, _STIXBase):
_type = type
_properties = OrderedDict()
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties.update(properties)
def __init__(self, **kwargs):
_STIXBase.__init__(self, **kwargs)
cls.__init__(self, **kwargs)
_register_marking(_Custom)
return _Custom
return custom_builder
TLP_WHITE = MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
@ -148,17 +184,3 @@ TLP_RED = MarkingDefinition(
definition_type="tlp",
definition=TLPMarking(tlp="red")
)
COMMON_PROPERTIES = OrderedDict()
COMMON_PROPERTIES.update([
# 'type' and 'id' should be defined on each individual type
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('revoked', BooleanProperty()),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
])

View File

@ -121,7 +121,7 @@ class DependentPropertiesError(STIXError, TypeError):
def __str__(self):
msg = "The property dependencies for {0}: ({1}) are not met."
return msg.format(self.cls.__name__,
", ".join(x for x in self.dependencies))
", ".join(name for x in self.dependencies for name in x))
class AtLeastOnePropertyError(STIXError, TypeError):

View File

@ -804,7 +804,7 @@ def CustomObservable(type='x-custom-observable', properties=None):
('type', TypeProperty(_type)),
])
if not properties:
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
_properties.update(properties)

View File

@ -6,6 +6,7 @@ import re
import uuid
from six import string_types, text_type
from stix2patterns.validator import run_validator
from .base import _STIXBase
from .exceptions import DictionaryKeyError
@ -370,3 +371,17 @@ class EnumProperty(StringProperty):
if value not in self.allowed:
raise ValueError("value '%s' is not valid for this enumeration." % value)
return self.string_type(value)
class PatternProperty(StringProperty):
def __init__(self, **kwargs):
super(PatternProperty, self).__init__(**kwargs)
def clean(self, value):
str_value = super(PatternProperty, self).clean(value)
errors = run_validator(str_value)
if errors:
raise ValueError(str(errors[0]))
return self.string_type(value)

View File

@ -11,8 +11,8 @@ from .base import _STIXBase
from .common import ExternalReference, GranularMarking, KillChainPhase
from .observables import ObservableProperty
from .properties import (BooleanProperty, IDProperty, IntegerProperty,
ListProperty, ReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
ListProperty, PatternProperty, ReferenceProperty,
StringProperty, TimestampProperty, TypeProperty)
from .utils import NOW
@ -117,7 +117,7 @@ class Indicator(_STIXBase):
('labels', ListProperty(StringProperty, required=True)),
('name', StringProperty()),
('description', StringProperty()),
('pattern', StringProperty(required=True)),
('pattern', PatternProperty(required=True)),
('valid_from', TimestampProperty(default=lambda: NOW)),
('valid_until', TimestampProperty()),
('kill_chain_phases', ListProperty(KillChainPhase)),
@ -330,13 +330,10 @@ def CustomObject(type='x-custom-type', properties=None):
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
])
if not properties:
if not properties or not isinstance(properties, list):
raise ValueError("Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]")
normal_properties = [x for x in properties if not x[0].startswith("x_")]
custom_properties = [x for x in properties if x[0].startswith("x_")]
_properties.update(normal_properties)
_properties.update([x for x in properties if not x[0].startswith("x_")])
# This is to follow the general properties structure.
_properties.update([
@ -348,7 +345,7 @@ def CustomObject(type='x-custom-type', properties=None):
])
# Put all custom properties at the bottom, sorted alphabetically.
_properties.update(sorted(custom_properties, key=lambda x: x[0]))
_properties.update(sorted([x for x in properties if x[0].startswith("x_")], key=lambda x: x[0]))
def __init__(self, **kwargs):
_STIXBase.__init__(self, **kwargs)

View File

@ -16,7 +16,6 @@ Notes:
"""
import copy
import uuid
from six import iteritems
@ -35,11 +34,11 @@ class DataStore(object):
this abstract class for the specific data store.
"""
def __init__(self, name="DataStore"):
def __init__(self, name="DataStore", source=None, sink=None):
self.name = name
self.id = make_id()
self.source = None
self.sink = None
self.id_ = make_id()
self.source = source
self.sink = sink
def get(self, stix_id):
"""
@ -109,14 +108,14 @@ class DataSink(object):
different sink components.
Attributes:
id (str): A unique UUIDv4 to identify this DataSink.
id_ (str): A unique UUIDv4 to identify this DataSink.
name (str): The descriptive name that identifies this DataSink.
"""
def __init__(self, name="DataSink"):
self.name = name
self.id = make_id()
self.id_ = make_id()
def add(self, stix_objs):
"""
@ -134,17 +133,15 @@ class DataSource(object):
different source components.
Attributes:
id (str): A unique UUIDv4 to identify this DataSource.
id_ (str): A unique UUIDv4 to identify this DataSource.
name (str): The descriptive name that identifies this DataSource.
filters (dict): A collection of filters present in this DataSource.
filter_allowed (dict): A collection of the allowed filters in this
DataSource.
filters (set): A collection of filters present in this DataSource.
"""
def __init__(self, name="DataSource"):
self.name = name
self.id = make_id()
self.id_ = make_id()
self.filters = set()
def get(self, stix_id, _composite_filters=None):
@ -213,26 +210,26 @@ class DataSource(object):
"""Add multiple filters to the DataSource.
Args:
filter (list): list of filters (dict) to add to the Data Source.
filters (list): list of filters (dict) to add to the Data Source.
"""
for filter in filters:
self.add_filter(filter)
for filter_ in filters:
self.add_filter(filter_)
def add_filter(self, filter):
def add_filter(self, filter_):
"""Add a filter."""
# check filter field is a supported STIX 2.0 common field
if filter.field not in STIX_COMMON_FIELDS:
if filter_.field not in STIX_COMMON_FIELDS:
raise ValueError("Filter 'field' is not a STIX 2.0 common property. Currently only STIX object common properties supported")
# check filter operator is supported
if filter.op not in FILTER_OPS:
if filter_.op not in FILTER_OPS:
raise ValueError("Filter operation(from 'op' field) not supported")
# check filter value type is supported
if type(filter.value) not in FILTER_VALUE_TYPES:
if type(filter_.value) not in FILTER_VALUE_TYPES:
raise ValueError("Filter 'value' type is not supported. The type(value) must be python immutable type or dictionary")
self.filters.add(filter)
self.filters.add(filter_)
# TODO: Do we need a remove_filter function?
@ -256,28 +253,34 @@ class DataSource(object):
for stix_obj in stix_objs:
clean = True
for filter_ in query:
# skip filter as filter was identified (when added) as
# not a common filter
if filter_.field not in STIX_COMMON_FIELDS:
continue
# check filter "field" is in STIX object - if cant be applied
# due to STIX object, STIX object is discarded (i.e. did not
# make it through the filter)
if filter_.field not in stix_obj.keys():
clean = False
break
try:
# skip filter as filter was identified (when added) as
# not a common filter
if filter_.field not in STIX_COMMON_FIELDS:
raise Exception("Error, field: {0} is not supported for filtering on.".format(filter_.field))
# For properties like granular_markings and external_references
# need to break the first property from the string.
if "." in filter_.field:
field = filter_.field.split(".")[0]
else:
field = filter_.field
# check filter "field" is in STIX object - if cant be
# applied due to STIX object, STIX object is discarded
# (i.e. did not make it through the filter)
if field not in stix_obj.keys():
clean = False
break
match = STIX_COMMON_FILTERS_MAP[filter_.field](filter_, stix_obj)
if not match:
clean = False
break
elif match == -1:
# error, filter operator not supported for specified field:
pass
raise Exception("Error, filter operator: {0} not supported for specified field: {1}".format(filter_.op, filter_.field))
except Exception as e:
print(e)
raise ValueError(e)
# if object unmarked after all filters, add it
if clean:
@ -361,14 +364,14 @@ class CompositeDataSource(DataSource):
# for every configured Data Source, call its retrieve handler
for ds_id, ds in iteritems(self.data_sources):
data = ds.get(stix_id=stix_id, _composite_filters=list(self.filters))
all_data.extend(data)
all_data.append(data)
# remove duplicate versions
if len(all_data) > 0:
all_data = self.deduplicate(all_data)
# reduce to most recent version
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
stix_obj = sorted(all_data, key=lambda k: k['modified'], reverse=True)[0]
return stix_obj
@ -393,10 +396,14 @@ class CompositeDataSource(DataSource):
"""
all_data = []
all_filters = self.filters
if _composite_filters:
all_filters = set(self.filters).update(_composite_filters)
# retrieve STIX objects from all configured data sources
for ds_id, ds in iteritems(self.data_sources):
data = ds.all_versions(stix_id=stix_id, _composite_filters=list(self.filters))
data = ds.all_versions(stix_id=stix_id, _composite_filters=list(all_filters))
all_data.extend(data)
# remove exact duplicates (where duplicates are STIX 2.0 objects
@ -426,11 +433,15 @@ class CompositeDataSource(DataSource):
query = []
all_data = []
all_filters = self.filters
if _composite_filters:
all_filters = set(self.filters).update(_composite_filters)
# federate query to all attached data sources,
# pass composite filters to id
for ds_id, ds in iteritems(self.data_sources):
data = ds.query(query=query, _composite_filters=list(self.filters))
data = ds.query(query=query, _composite_filters=list(all_filters))
all_data.extend(data)
# remove exact duplicates (where duplicates are STIX 2.0
@ -449,14 +460,14 @@ class CompositeDataSource(DataSource):
"""
for ds in data_sources:
if issubclass(ds, DataSource):
if self.data_sources[ds['id']] in self.data_sources.keys():
if issubclass(ds.__class__, DataSource):
if ds.id_ in self.data_sources:
# data source already attached to Composite Data Source
continue
# add data source to Composite Data Source
# (its id will be its key identifier)
self.data_sources[ds['id']] = ds
self.data_sources[ds.id_] = ds
else:
# the Data Source object is not a proper subclass
# of DataSource Abstract Class
@ -469,24 +480,18 @@ class CompositeDataSource(DataSource):
"""Remove/detach Data Source from the Composite Data Source instance
Args:
data_source_ids (list): a list of Data Source
id's(which are strings)
data_source_ids (list): a list of Data Source identifiers.
"""
for id_ in data_source_ids:
try:
if self.data_sources[id_]:
del self.data_sources[id_]
except KeyError:
# Data Source 'id' was not found in CompositeDataSource's
# list of data sources
pass
if id_ in self.data_sources:
del self.data_sources[id_]
else:
raise ValueError("DataSource 'id' not found in CompositeDataSource collection.")
return
@property
def data_sources(self):
def get_all_data_sources(self):
"""Return all attached Data Sources
"""
return copy.deepcopy(self.data_sources.values())
return self.data_sources.values()

View File

@ -19,7 +19,7 @@ from stix2.sources import DataSink, DataSource, DataStore, Filter
class FileSystemStore(DataStore):
"""
"""
def __init__(self, stix_dir="stix_data", name="FileSystemStore"):
def __init__(self, name="FileSystemStore", stix_dir="stix_data"):
super(FileSystemStore, self).__init__(name=name)
self.source = FileSystemSource(stix_dir=stix_dir)
self.sink = FileSystemSink(stix_dir=stix_dir)
@ -28,7 +28,7 @@ class FileSystemStore(DataStore):
class FileSystemSink(DataSink):
"""
"""
def __init__(self, stix_dir="stix_data", name="FileSystemSink"):
def __init__(self, name="FileSystemSink", stix_dir="stix_data"):
super(FileSystemSink, self).__init__(name=name)
self.stix_dir = os.path.abspath(stix_dir)
@ -58,7 +58,7 @@ class FileSystemSink(DataSink):
class FileSystemSource(DataSource):
"""
"""
def __init__(self, stix_dir="stix_data", name="FileSystemSource"):
def __init__(self, name="FileSystemSource", stix_dir="stix_data"):
super(FileSystemSource, self).__init__(name=name)
self.stix_dir = os.path.abspath(stix_dir)
@ -71,8 +71,8 @@ class FileSystemSource(DataSource):
return self.stix_dir
@stix_dir.setter
def stix_dir(self, dir):
self.stix_dir = dir
def stix_dir(self, dir_):
self.stix_dir = dir_
def get(self, stix_id, _composite_filters=None):
"""

View File

@ -21,15 +21,16 @@ Notes:
import json
import os
from stix2validator import validate_string
from stix2 import Bundle
from stix2.sources import DataSink, DataSource, DataStore, Filter
from stix2validator import validate_string
class MemoryStore(DataStore):
"""
"""
def __init__(self, stix_data=None, name="MemoryStore"):
def __init__(self, name="MemoryStore", stix_data=None):
"""
Notes:
It doesn't make sense to create a MemoryStore by passing
@ -75,7 +76,7 @@ class MemoryStore(DataStore):
class MemorySink(DataSink):
"""
"""
def __init__(self, stix_data=None, name="MemorySink", _store=False):
def __init__(self, name="MemorySink", stix_data=None, _store=False):
"""
Args:
stix_data (dictionary OR list): valid STIX 2.0 content in
@ -150,7 +151,7 @@ class MemorySink(DataSink):
class MemorySource(DataSource):
def __init__(self, stix_data=None, name="MemorySource", _store=False):
def __init__(self, name="MemorySource", stix_data=None, _store=False):
"""
Args:
stix_data (dictionary OR list): valid STIX 2.0 content in
@ -177,8 +178,8 @@ class MemorySource(DataSource):
for stix_obj in stix_data["objects"]:
self.data[stix_obj["id"]] = stix_obj
else:
print("Error: json data passed to MemorySink() was found to not be validated by STIX 2 Validator")
print(r)
print("Error: json data passed to MemorySource() was found to not be validated by STIX 2 Validator")
print(r.as_dict())
self.data = {}
elif type(stix_data) == list:
# STIX objects are in a list

View File

@ -128,3 +128,19 @@ def test_parse_bundle():
assert bundle.objects[0].type == 'indicator'
assert bundle.objects[1].type == 'malware'
assert bundle.objects[2].type == 'relationship'
def test_parse_unknown_type():
unknown = {
"type": "other",
"id": "other--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"created": "2016-04-06T20:03:00Z",
"modified": "2016-04-06T20:03:00Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"description": "Campaign by Green Group against a series of targets in the financial services sector.",
"name": "Green Group Attacks Against Finance",
}
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
stix2.parse(unknown)
assert str(excinfo.value) == "Can't parse unknown object type 'other'! For custom types, use the CustomObject decorator."

View File

@ -128,6 +128,22 @@ def test_custom_observable_object():
NewObservable(property1='something', property2=4)
def test_custom_no_properties_raises_exception():
with pytest.raises(ValueError):
@stix2.sdo.CustomObject('x-new-object-type')
class NewObject1(object):
pass
def test_custom_wrong_properties_arg_raises_exception():
with pytest.raises(ValueError):
@stix2.observables.CustomObservable('x-new-object-type', (("prop", stix2.properties.BooleanProperty())))
class NewObject2(object):
pass
def test_parse_custom_observable_object():
nt_string = """{
"type": "x-new-observable",

View File

@ -1,19 +1,16 @@
import pytest
from taxii2_client import Collection
from taxii2client import Collection
from stix2.sources import DataSource, Filter, taxii
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
DataStore, Filter, make_id, taxii)
from stix2.sources.memory import MemorySource
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
class MockTAXIIClient(object):
"""Mock for taxii2_client.TAXIIClient"""
def get(self):
return {}
def post(self):
return {}
pass
@pytest.fixture
@ -21,6 +18,127 @@ def collection():
return Collection(COLLECTION_URL, MockTAXIIClient())
STIX_OBJS1 = [
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.936Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
]
STIX_OBJS2 = [
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-31T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
]
def test_ds_smoke():
ds1 = DataSource()
ds2 = DataSink()
ds3 = DataStore(source=ds1, sink=ds2)
with pytest.raises(NotImplementedError):
ds3.add(None)
with pytest.raises(NotImplementedError):
ds3.all_versions("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")
with pytest.raises(NotImplementedError):
ds3.get("malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")
with pytest.raises(NotImplementedError):
ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")])
def test_ds_taxii(collection):
ds = taxii.TAXIICollectionSource(collection)
assert ds.name == 'TAXIICollectionSource'
@ -104,6 +222,8 @@ def test_add_get_remove_filter():
assert len(ds.filters) == 2
ds.add_filters(valid_filters)
def test_apply_common_filters():
stix_objs = [
@ -132,9 +252,21 @@ def test_apply_common_filters():
},
{
"created": "2014-05-08T09:00:00.000Z",
"granular_markings": [
{
"marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
"selectors": [
"relationship_type"
]
}
],
"id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463",
"modified": "2014-05-08T09:00:00.000Z",
"object_marking_refs": [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
],
"relationship_type": "indicates",
"revoked": True,
"source_ref": "indicator--a932fcc6-e032-176c-126f-cb970a5a1ade",
"target_ref": "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111",
"type": "relationship"
@ -145,6 +277,13 @@ def test_apply_common_filters():
Filter("type", "!=", "relationship"),
Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
Filter("labels", "in", "remote-access-trojan"),
Filter("created", ">", "2015-01-01T01:00:00.000Z"),
Filter("revoked", "=", True),
Filter("revoked", "!=", True),
Filter("revoked", "?", False),
Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
Filter("granular_markings.selectors", "in", "relationship_type"),
Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
]
ds = DataSource()
@ -160,73 +299,90 @@ def test_apply_common_filters():
resp = ds.apply_common_filters(stix_objs, [filters[2]])
assert resp[0]['id'] == stix_objs[0]['id']
resp = ds.apply_common_filters(stix_objs, [filters[3]])
assert resp[0]['id'] == stix_objs[0]['id']
assert len(resp) == 1
def test_deduplicate():
stix_objs = [
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.936Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
},
{
"created": "2017-01-27T13:49:53.935Z",
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
"labels": [
"url-watchlist"
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z"
}
resp = ds.apply_common_filters(stix_objs, [filters[4]])
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# Note that if 'revoked' property is not present in object.
# Currently we can't use such an expression to filter for...
resp = ds.apply_common_filters(stix_objs, [filters[5]])
assert len(resp) == 0
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(stix_objs, [filters[6]])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(filters[6].op,
filters[6].field)
resp = ds.apply_common_filters(stix_objs, [filters[7]])
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
resp = ds.apply_common_filters(stix_objs, [filters[8], filters[9]])
assert resp[0]['id'] == stix_objs[2]['id']
assert len(resp) == 1
# These are used with STIX_OBJS2
more_filters = [
Filter("modified", "<", "2017-01-28T13:49:53.935Z"),
Filter("modified", ">", "2017-01-28T13:49:53.935Z"),
Filter("modified", ">=", "2017-01-27T13:49:53.935Z"),
Filter("modified", "<=", "2017-01-27T13:49:53.935Z"),
Filter("modified", "?", "2017-01-27T13:49:53.935Z"),
Filter("id", "!=", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"),
Filter("id", "?", "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"),
Filter("notacommonproperty", "=", "bar"),
]
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[0]])
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[1]])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[2]])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 3
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[3]])
assert resp[0]['id'] == STIX_OBJS2[1]['id']
assert len(resp) == 2
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [more_filters[4]])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(more_filters[4].op,
more_filters[4].field)
resp = ds.apply_common_filters(STIX_OBJS2, [more_filters[5]])
assert resp[0]['id'] == STIX_OBJS2[0]['id']
assert len(resp) == 1
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [more_filters[6]])
assert str(excinfo.value) == ("Error, filter operator: {0} not supported "
"for specified field: {1}").format(more_filters[6].op,
more_filters[6].field)
with pytest.raises(ValueError) as excinfo:
ds.apply_common_filters(STIX_OBJS2, [more_filters[7]])
assert str(excinfo.value) == ("Error, field: {0} is not supported for "
"filtering on.".format(more_filters[7].field))
def test_deduplicate():
ds = DataSource()
unique = ds.deduplicate(stix_objs)
unique = ds.deduplicate(STIX_OBJS1)
# Only 3 objects are unique
# 2 id's vary
@ -243,6 +399,56 @@ def test_deduplicate():
assert "2017-01-27T13:49:53.936Z" in mods
def test_add_remove_composite_datasource():
cds = CompositeDataSource()
ds1 = DataSource()
ds2 = DataSource()
ds3 = DataSink()
cds.add_data_source([ds1, ds2, ds1, ds3])
assert len(cds.get_all_data_sources()) == 2
cds.remove_data_source([ds1.id_, ds2.id_])
assert len(cds.get_all_data_sources()) == 0
with pytest.raises(ValueError):
cds.remove_data_source([ds3.id_])
def test_composite_datasource_operations():
BUNDLE1 = dict(id="bundle--%s" % make_id(),
objects=STIX_OBJS1,
spec_version="2.0",
type="bundle")
cds = CompositeDataSource()
ds1 = MemorySource(stix_data=BUNDLE1)
ds2 = MemorySource(stix_data=STIX_OBJS2)
cds.add_data_source([ds1, ds2])
indicators = cds.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
# In STIX_OBJS2 changed the 'modified' property to a later time...
assert len(indicators) == 2
indicator = cds.get("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
assert indicator["id"] == "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f"
assert indicator["modified"] == "2017-01-31T13:49:53.935Z"
assert indicator["type"] == "indicator"
query = [
Filter("type", "=", "indicator")
]
results = cds.query(query)
# STIX_OBJS2 has indicator with later time, one with different id, one with
# original time in STIX_OBJS1
assert len(results) == 3
# def test_data_source_file():
# ds = file.FileDataSource()
#

View File

@ -174,3 +174,23 @@ def test_parse_indicator(data):
assert idctr.valid_from == dt.datetime(1970, 1, 1, 0, 0, 1, tzinfo=pytz.utc)
assert idctr.labels[0] == "malicious-activity"
assert idctr.pattern == "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']"
def test_invalid_indicator_pattern():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.Indicator(
labels=['malicious-activity'],
pattern="file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e'",
)
assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.prop_name == 'pattern'
assert 'input is missing square brackets' in excinfo.value.reason
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.Indicator(
labels=['malicious-activity'],
pattern='[file:hashes.MD5 = "d41d8cd98f00b204e9800998ecf8427e"]',
)
assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.prop_name == 'pattern'
assert 'mismatched input' in excinfo.value.reason

View File

@ -75,7 +75,7 @@ def test_marking_def_example_with_tlp():
assert str(TLP_WHITE) == EXPECTED_TLP_MARKING_DEFINITION
def test_marking_def_example_with_statement():
def test_marking_def_example_with_statement_positional_argument():
marking_definition = stix2.MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
@ -86,12 +86,13 @@ def test_marking_def_example_with_statement():
assert str(marking_definition) == EXPECTED_STATEMENT_MARKING_DEFINITION
def test_marking_def_example_with_positional_statement():
def test_marking_def_example_with_kwargs_statement():
kwargs = dict(statement="Copyright 2016, Example Corp")
marking_definition = stix2.MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
definition_type="statement",
definition=stix2.StatementMarking("Copyright 2016, Example Corp")
definition=stix2.StatementMarking(**kwargs)
)
assert str(marking_definition) == EXPECTED_STATEMENT_MARKING_DEFINITION
@ -180,4 +181,64 @@ def test_parse_marking_definition(data):
assert gm.definition_type == "tlp"
@stix2.common.CustomMarking('x-new-marking-type', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
])
class NewMarking(object):
def __init__(self, property2=None, **kwargs):
return
def test_registered_custom_marking():
nm = NewMarking(property1='something', property2=55)
marking_def = stix2.MarkingDefinition(
id="marking-definition--00000000-0000-0000-0000-000000000012",
created="2017-01-22T00:00:00.000Z",
definition_type="x-new-marking-type",
definition=nm
)
assert marking_def.type == "marking-definition"
assert marking_def.id == "marking-definition--00000000-0000-0000-0000-000000000012"
assert marking_def.created == dt.datetime(2017, 1, 22, 0, 0, 0, tzinfo=pytz.utc)
assert marking_def.definition.property1 == "something"
assert marking_def.definition.property2 == 55
assert marking_def.definition_type == "x-new-marking-type"
def test_not_registered_marking_raises_exception():
with pytest.raises(ValueError) as excinfo:
# Used custom object on purpose to demonstrate a not-registered marking
@stix2.sdo.CustomObject('x-new-marking-type2', [
('property1', stix2.properties.StringProperty(required=True)),
('property2', stix2.properties.IntegerProperty()),
])
class NewObject2(object):
def __init__(self, property2=None, **kwargs):
return
no = NewObject2(property1='something', property2=55)
stix2.MarkingDefinition(
id="marking-definition--00000000-0000-0000-0000-000000000012",
created="2017-01-22T00:00:00.000Z",
definition_type="x-new-marking-type2",
definition=no
)
assert str(excinfo.value) == "definition_type must be a valid marking type"
def test_marking_wrong_type_construction():
with pytest.raises(ValueError) as excinfo:
# Test passing wrong type for properties.
@stix2.CustomMarking('x-new-marking-type2', ("a", "b"))
class NewObject3(object):
pass
assert str(excinfo.value) == "Must supply a list, containing tuples. For example, [('property1', IntegerProperty())]"
# TODO: Add other examples

View File

@ -8,6 +8,8 @@ import stix2
from .constants import OBSERVED_DATA_ID
OBJECTS_REGEX = re.compile('\"objects\": {(?:.*?)(?:(?:[^{]*?)|(?:{[^{]*?}))*}', re.DOTALL)
EXPECTED = """{
"type": "observed-data",
@ -173,7 +175,7 @@ def test_parse_observed_data(data):
}""",
])
def test_parse_artifact_valid(data):
odata_str = re.compile('"objects".+\}', re.DOTALL).sub('"objects": { %s }\n}' % data, EXPECTED)
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
odata = stix2.parse(odata_str)
assert odata.objects["0"].type == "artifact"
@ -194,7 +196,7 @@ def test_parse_artifact_valid(data):
}""",
])
def test_parse_artifact_invalid(data):
odata_str = re.compile('"objects".+\}', re.DOTALL).sub('"objects": { %s }\n}' % data, EXPECTED)
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
with pytest.raises(ValueError):
stix2.parse(odata_str)
@ -204,6 +206,7 @@ def test_artifact_example_dependency_error():
stix2.Artifact(url="http://example.com/sirvizio.exe")
assert excinfo.value.dependencies == [("hashes", "url")]
assert str(excinfo.value) == "The property dependencies for Artifact: (hashes, url) are not met."
@pytest.mark.parametrize("data", [
@ -215,7 +218,7 @@ def test_artifact_example_dependency_error():
}""",
])
def test_parse_autonomous_system_valid(data):
odata_str = re.compile('"objects".+\}', re.DOTALL).sub('"objects": { %s }\n}' % data, EXPECTED)
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
odata = stix2.parse(odata_str)
assert odata.objects["0"].type == "autonomous-system"
assert odata.objects["0"].number == 15139
@ -358,7 +361,7 @@ def test_parse_email_message_not_multipart(data):
}""",
])
def test_parse_file_archive(data):
odata_str = re.compile('"objects".+\}', re.DOTALL).sub('"objects": { %s }\n}' % data, EXPECTED)
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
odata = stix2.parse(odata_str)
assert odata.objects["3"].extensions['archive-ext'].version == "5.0"
@ -555,6 +558,7 @@ def test_artifact_mutual_exclusion_error():
assert excinfo.value.cls == stix2.Artifact
assert excinfo.value.properties == ["payload_bin", "url"]
assert str(excinfo.value) == "The (payload_bin, url) properties for Artifact are mutually exclusive."
def test_directory_example():
@ -925,6 +929,10 @@ def test_process_example_empty_error():
properties_of_process = list(stix2.Process._properties.keys())
properties_of_process.remove("type")
assert excinfo.value.properties == sorted(properties_of_process)
msg = "At least one of the ({1}) properties for {0} must be populated."
msg = msg.format(stix2.Process.__name__,
", ".join(sorted(properties_of_process)))
assert str(excinfo.value) == msg
def test_process_example_empty_with_extensions():

View File

@ -206,15 +206,22 @@ def test_dictionary_property_valid(d):
@pytest.mark.parametrize("d", [
{'a': 'something'},
{'a'*300: 'something'},
{'Hey!': 'something'},
[{'a': 'something'}, "Invalid dictionary key a: (shorter than 3 characters)."],
[{'a'*300: 'something'}, "Invalid dictionary key aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
"aaaaaaaaaaaaaaaaaaaaaaa: (longer than 256 characters)."],
[{'Hey!': 'something'}, "Invalid dictionary key Hey!: (contains characters other thanlowercase a-z, "
"uppercase A-Z, numerals 0-9, hyphen (-), or underscore (_))."],
])
def test_dictionary_property_invalid(d):
dict_prop = DictionaryProperty()
with pytest.raises(DictionaryKeyError):
dict_prop.clean(d)
with pytest.raises(DictionaryKeyError) as excinfo:
dict_prop.clean(d[0])
assert str(excinfo.value) == d[1]
@pytest.mark.parametrize("value", [

View File

@ -128,6 +128,11 @@ def test_versioning_error_bad_modified_value():
assert excinfo.value.prop_name == "modified"
assert excinfo.value.reason == "The new modified datetime cannot be before the current modified datatime."
msg = "Invalid value for {0} '{1}': {2}"
msg = msg.format(stix2.Campaign.__name__, "modified",
"The new modified datetime cannot be before the current modified datatime.")
assert str(excinfo.value) == msg
def test_versioning_error_usetting_required_property():
campaign_v1 = stix2.Campaign(
@ -145,6 +150,10 @@ def test_versioning_error_usetting_required_property():
assert excinfo.value.cls == stix2.Campaign
assert excinfo.value.properties == ["name"]
msg = "No values for required properties for {0}: ({1})."
msg = msg.format(stix2.Campaign.__name__, "name")
assert str(excinfo.value) == msg
def test_versioning_error_new_version_of_revoked():
campaign_v1 = stix2.Campaign(
@ -162,6 +171,7 @@ def test_versioning_error_new_version_of_revoked():
campaign_v2.new_version(name="barney")
assert excinfo.value.called_by == "new_version"
assert str(excinfo.value) == "Cannot create a new version of a revoked object."
def test_versioning_error_revoke_of_revoked():
@ -180,3 +190,4 @@ def test_versioning_error_revoke_of_revoked():
campaign_v2.revoke()
assert excinfo.value.called_by == "revoke"
assert str(excinfo.value) == "Cannot revoke an already revoked object."

View File

@ -133,7 +133,7 @@ def find_property_index(obj, properties, tuple_to_find):
for item in pv:
if isinstance(item, _STIXBase):
val = find_property_index(item,
item._object_properties(),
item.object_properties(),
tuple_to_find)
if val is not None:
return val
@ -146,7 +146,7 @@ def find_property_index(obj, properties, tuple_to_find):
for item in pv.values():
if isinstance(item, _STIXBase):
val = find_property_index(item,
item._object_properties(),
item.object_properties(),
tuple_to_find)
if val is not None:
return val