Merge branch 'master' into 76-filesystem-bundles

stix2.0
Chris Lenk 2017-10-31 14:10:29 -04:00 committed by GitHub
commit d93bf44c00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 124 additions and 2321 deletions

View File

@ -9,7 +9,6 @@ known_third_party =
simplejson simplejson
six, six,
stix2patterns, stix2patterns,
stix2validator,
taxii2client, taxii2client,
known_first_party = stix2 known_first_party = stix2
force_sort_within_sections = 1 force_sort_within_sections = 1

View File

@ -13,7 +13,7 @@ including data markings, versioning, and for resolving STIX IDs across
multiple data sources. multiple data sources.
For more information, see `the For more information, see `the
documentation <https://stix2.readthedocs.io/en/latest/>`__ on documentation <https://stix2.readthedocs.io/>`__ on
ReadTheDocs. ReadTheDocs.
Installation Installation

File diff suppressed because it is too large Load Diff

View File

@ -53,7 +53,6 @@ setup(
'simplejson', 'simplejson',
'six', 'six',
'stix2-patterns', 'stix2-patterns',
'stix2-validator',
'taxii2-client', 'taxii2-client',
], ],
) )

View File

@ -152,3 +152,22 @@ class Environment(object):
def parse(self, *args, **kwargs): def parse(self, *args, **kwargs):
return _parse(*args, **kwargs) return _parse(*args, **kwargs)
parse.__doc__ = _parse.__doc__ parse.__doc__ = _parse.__doc__
def creator_of(self, obj):
"""Retrieve the Identity refered to by the object's `created_by_ref`.
Args:
obj: The STIX object whose `created_by_ref` property will be looked
up.
Returns:
The STIX object's creator, or
None, if the object contains no `created_by_ref` property or the
object's creator cannot be found.
"""
creator_id = obj.get('created_by_ref', '')
if creator_id:
return self.get(creator_id)
else:
return None

View File

@ -280,6 +280,8 @@ class CompositeDataSource(DataSource):
# remove duplicate versions # remove duplicate versions
if len(all_data) > 0: if len(all_data) > 0:
all_data = deduplicate(all_data) all_data = deduplicate(all_data)
else:
return None
# reduce to most recent version # reduce to most recent version
stix_obj = sorted(all_data, key=lambda k: k['modified'], reverse=True)[0] stix_obj = sorted(all_data, key=lambda k: k['modified'], reverse=True)[0]

View File

@ -229,10 +229,13 @@ class MemorySource(DataSource):
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom) all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
# reduce to most recent version if all_data:
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0] # reduce to most recent version
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
return stix_obj return stix_obj
else:
return None
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False): def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
"""Retrieve STIX objects from in-memory dict via STIX ID, all versions of it """Retrieve STIX objects from in-memory dict via STIX ID, all versions of it

View File

@ -1,9 +1,5 @@
""" """
Python STIX 2.0 TAXII Source/Sink Python STIX 2.x TaxiiCollectionStore
TODO:
Test everything
""" """
from stix2.base import _STIXBase from stix2.base import _STIXBase
@ -125,8 +121,10 @@ class TAXIICollectionSource(DataSource):
stix_obj = list(apply_common_filters(stix_objs, query)) stix_obj = list(apply_common_filters(stix_objs, query))
if len(stix_obj): if len(stix_obj):
stix_obj = stix_obj[0] stix_obj = parse(stix_obj[0], allow_custom=allow_custom)
stix_obj = parse(stix_obj, allow_custom=allow_custom) if stix_obj.id != stix_id:
# check - was added to handle erroneous TAXII servers
stix_obj = None
else: else:
stix_obj = None stix_obj = None
@ -155,7 +153,13 @@ class TAXIICollectionSource(DataSource):
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom) all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
return all_data # parse STIX objects from TAXII returned json
all_data = [parse(stix_obj) for stix_obj in all_data]
# check - was added to handle erroneous TAXII servers
all_data_clean = [stix_obj for stix_obj in all_data if stix_obj.id == stix_id]
return all_data_clean
def query(self, query=None, _composite_filters=None, allow_custom=False): def query(self, query=None, _composite_filters=None, allow_custom=False):
"""Search and retreive STIX objects based on the complete query """Search and retreive STIX objects based on the complete query
@ -183,7 +187,7 @@ class TAXIICollectionSource(DataSource):
if not isinstance(query, list): if not isinstance(query, list):
# make sure dont make set from a Filter object, # make sure dont make set from a Filter object,
# need to make a set from a list of Filter objects (even if just one Filter) # need to make a set from a list of Filter objects (even if just one Filter)
query = list(query) query = [query]
query = set(query) query = set(query)
# combine all query filters # combine all query filters

View File

@ -184,3 +184,35 @@ def test_parse_malware():
assert mal.modified == FAKE_TIME assert mal.modified == FAKE_TIME
assert mal.labels == ['ransomware'] assert mal.labels == ['ransomware']
assert mal.name == "Cryptolocker" assert mal.name == "Cryptolocker"
def test_created_by():
identity = stix2.Identity(**IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=identity.id)
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
env.add(identity)
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
creator = env.creator_of(ind)
assert creator is identity
def test_created_by_no_datasource():
identity = stix2.Identity(**IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=identity.id)
env = stix2.Environment(factory=factory)
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
with pytest.raises(AttributeError) as excinfo:
env.creator_of(ind)
assert 'Environment has no data source' in str(excinfo.value)
def test_created_by_not_found():
identity = stix2.Identity(**IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=identity.id)
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
creator = env.creator_of(ind)
assert creator is None