Merge master
commit
a17d770d34
|
@ -9,7 +9,6 @@ known_third_party =
|
||||||
simplejson
|
simplejson
|
||||||
six,
|
six,
|
||||||
stix2patterns,
|
stix2patterns,
|
||||||
stix2validator,
|
|
||||||
taxii2client,
|
taxii2client,
|
||||||
known_first_party = stix2
|
known_first_party = stix2
|
||||||
force_sort_within_sections = 1
|
force_sort_within_sections = 1
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
1
setup.py
1
setup.py
|
@ -53,7 +53,6 @@ setup(
|
||||||
'simplejson',
|
'simplejson',
|
||||||
'six',
|
'six',
|
||||||
'stix2-patterns',
|
'stix2-patterns',
|
||||||
'stix2-validator',
|
|
||||||
'taxii2-client',
|
'taxii2-client',
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
@ -40,7 +40,14 @@ class _STIXBase(collections.Mapping):
|
||||||
"""Base class for STIX object types"""
|
"""Base class for STIX object types"""
|
||||||
|
|
||||||
def object_properties(self):
|
def object_properties(self):
|
||||||
return list(self._properties.keys())
|
props = set(self._properties.keys())
|
||||||
|
custom_props = list(set(self._inner.keys()) - props)
|
||||||
|
custom_props.sort()
|
||||||
|
|
||||||
|
all_properties = list(self._properties.keys())
|
||||||
|
all_properties.extend(custom_props) # Any custom properties to the bottom
|
||||||
|
|
||||||
|
return all_properties
|
||||||
|
|
||||||
def _check_property(self, prop_name, prop, kwargs):
|
def _check_property(self, prop_name, prop, kwargs):
|
||||||
if prop_name not in kwargs:
|
if prop_name not in kwargs:
|
||||||
|
|
|
@ -9,7 +9,7 @@ import stix2
|
||||||
from . import exceptions
|
from . import exceptions
|
||||||
from .base import _STIXBase
|
from .base import _STIXBase
|
||||||
from .properties import IDProperty, ListProperty, Property, TypeProperty
|
from .properties import IDProperty, ListProperty, Property, TypeProperty
|
||||||
from .utils import get_dict
|
from .utils import get_class_hierarchy_names, get_dict
|
||||||
|
|
||||||
|
|
||||||
class STIXObjectProperty(Property):
|
class STIXObjectProperty(Property):
|
||||||
|
@ -19,6 +19,11 @@ class STIXObjectProperty(Property):
|
||||||
super(STIXObjectProperty, self).__init__()
|
super(STIXObjectProperty, self).__init__()
|
||||||
|
|
||||||
def clean(self, value):
|
def clean(self, value):
|
||||||
|
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
|
||||||
|
# a bundle with no further checks.
|
||||||
|
if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition')
|
||||||
|
for x in get_class_hierarchy_names(value)):
|
||||||
|
return value
|
||||||
try:
|
try:
|
||||||
dictified = get_dict(value)
|
dictified = get_dict(value)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
|
|
@ -152,3 +152,22 @@ class Environment(object):
|
||||||
def parse(self, *args, **kwargs):
|
def parse(self, *args, **kwargs):
|
||||||
return _parse(*args, **kwargs)
|
return _parse(*args, **kwargs)
|
||||||
parse.__doc__ = _parse.__doc__
|
parse.__doc__ = _parse.__doc__
|
||||||
|
|
||||||
|
def creator_of(self, obj):
|
||||||
|
"""Retrieve the Identity refered to by the object's `created_by_ref`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj: The STIX object whose `created_by_ref` property will be looked
|
||||||
|
up.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The STIX object's creator, or
|
||||||
|
None, if the object contains no `created_by_ref` property or the
|
||||||
|
object's creator cannot be found.
|
||||||
|
|
||||||
|
"""
|
||||||
|
creator_id = obj.get('created_by_ref', '')
|
||||||
|
if creator_id:
|
||||||
|
return self.get(creator_id)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
|
@ -44,36 +44,40 @@ class DataStore(object):
|
||||||
self.source = source
|
self.source = source
|
||||||
self.sink = sink
|
self.sink = sink
|
||||||
|
|
||||||
def get(self, stix_id):
|
def get(self, stix_id, allow_custom=False):
|
||||||
"""Retrieve the most recent version of a single STIX object by ID.
|
"""Retrieve the most recent version of a single STIX object by ID.
|
||||||
|
|
||||||
Translate get() call to the appropriate DataSource call.
|
Translate get() call to the appropriate DataSource call.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): the id of the STIX object to retrieve.
|
stix_id (str): the id of the STIX object to retrieve.
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
stix_obj: the single most recent version of the STIX
|
stix_obj: the single most recent version of the STIX
|
||||||
object specified by the "id".
|
object specified by the "id".
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.source.get(stix_id)
|
return self.source.get(stix_id, allow_custom=allow_custom)
|
||||||
|
|
||||||
def all_versions(self, stix_id):
|
def all_versions(self, stix_id, allow_custom=False):
|
||||||
"""Retrieve all versions of a single STIX object by ID.
|
"""Retrieve all versions of a single STIX object by ID.
|
||||||
|
|
||||||
Implement: Translate all_versions() call to the appropriate DataSource call
|
Implement: Translate all_versions() call to the appropriate DataSource call
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): the id of the STIX object to retrieve.
|
stix_id (str): the id of the STIX object to retrieve.
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
stix_objs (list): a list of STIX objects
|
stix_objs (list): a list of STIX objects
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self.source.all_versions(stix_id)
|
return self.source.all_versions(stix_id, allow_custom=allow_custom)
|
||||||
|
|
||||||
def query(self, query):
|
def query(self, query=None, allow_custom=False):
|
||||||
"""Retrieve STIX objects matching a set of filters.
|
"""Retrieve STIX objects matching a set of filters.
|
||||||
|
|
||||||
Implement: Specific data source API calls, processing,
|
Implement: Specific data source API calls, processing,
|
||||||
|
@ -82,6 +86,8 @@ class DataStore(object):
|
||||||
Args:
|
Args:
|
||||||
query (list): a list of filters (which collectively are the query)
|
query (list): a list of filters (which collectively are the query)
|
||||||
to conduct search on.
|
to conduct search on.
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
stix_objs (list): a list of STIX objects
|
stix_objs (list): a list of STIX objects
|
||||||
|
@ -89,15 +95,17 @@ class DataStore(object):
|
||||||
"""
|
"""
|
||||||
return self.source.query(query=query)
|
return self.source.query(query=query)
|
||||||
|
|
||||||
def add(self, stix_objs):
|
def add(self, stix_objs, allow_custom=False):
|
||||||
"""Store STIX objects.
|
"""Store STIX objects.
|
||||||
|
|
||||||
Translates add() to the appropriate DataSink call.
|
Translates add() to the appropriate DataSink call.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_objs (list): a list of STIX objects
|
stix_objs (list): a list of STIX objects
|
||||||
|
allow_custom (bool): whether to allow custom objects/properties or
|
||||||
|
not. Default: False.
|
||||||
"""
|
"""
|
||||||
return self.sink.add(stix_objs)
|
return self.sink.add(stix_objs, allow_custom=allow_custom)
|
||||||
|
|
||||||
|
|
||||||
class DataSink(object):
|
class DataSink(object):
|
||||||
|
@ -111,7 +119,7 @@ class DataSink(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.id = make_id()
|
self.id = make_id()
|
||||||
|
|
||||||
def add(self, stix_objs):
|
def add(self, stix_objs, allow_custom=False):
|
||||||
"""Store STIX objects.
|
"""Store STIX objects.
|
||||||
|
|
||||||
Implement: Specific data sink API calls, processing,
|
Implement: Specific data sink API calls, processing,
|
||||||
|
@ -120,6 +128,8 @@ class DataSink(object):
|
||||||
Args:
|
Args:
|
||||||
stix_objs (list): a list of STIX objects (where each object is a
|
stix_objs (list): a list of STIX objects (where each object is a
|
||||||
STIX object)
|
STIX object)
|
||||||
|
allow_custom (bool): whether to allow custom objects/properties or
|
||||||
|
not. Default: False.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
@ -139,7 +149,7 @@ class DataSource(object):
|
||||||
self.id = make_id()
|
self.id = make_id()
|
||||||
self.filters = set()
|
self.filters = set()
|
||||||
|
|
||||||
def get(self, stix_id, _composite_filters=None):
|
def get(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""
|
"""
|
||||||
Implement: Specific data source API calls, processing,
|
Implement: Specific data source API calls, processing,
|
||||||
functionality required for retrieving data from the data source
|
functionality required for retrieving data from the data source
|
||||||
|
@ -148,9 +158,10 @@ class DataSource(object):
|
||||||
stix_id (str): the id of the STIX 2.0 object to retrieve. Should
|
stix_id (str): the id of the STIX 2.0 object to retrieve. Should
|
||||||
return a single object, the most recent version of the object
|
return a single object, the most recent version of the object
|
||||||
specified by the "id".
|
specified by the "id".
|
||||||
|
|
||||||
_composite_filters (set): set of filters passed from the parent
|
_composite_filters (set): set of filters passed from the parent
|
||||||
the CompositeDataSource, not user supplied
|
the CompositeDataSource, not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
stix_obj: the STIX object
|
stix_obj: the STIX object
|
||||||
|
@ -158,7 +169,7 @@ class DataSource(object):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def all_versions(self, stix_id, _composite_filters=None):
|
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""
|
"""
|
||||||
Implement: Similar to get() except returns list of all object versions of
|
Implement: Similar to get() except returns list of all object versions of
|
||||||
the specified "id". In addition, implement the specific data
|
the specified "id". In addition, implement the specific data
|
||||||
|
@ -169,9 +180,10 @@ class DataSource(object):
|
||||||
stix_id (str): The id of the STIX 2.0 object to retrieve. Should
|
stix_id (str): The id of the STIX 2.0 object to retrieve. Should
|
||||||
return a list of objects, all the versions of the object
|
return a list of objects, all the versions of the object
|
||||||
specified by the "id".
|
specified by the "id".
|
||||||
|
|
||||||
_composite_filters (set): set of filters passed from the parent
|
_composite_filters (set): set of filters passed from the parent
|
||||||
CompositeDataSource, not user supplied
|
CompositeDataSource, not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
stix_objs (list): a list of STIX objects
|
stix_objs (list): a list of STIX objects
|
||||||
|
@ -179,7 +191,7 @@ class DataSource(object):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def query(self, query, _composite_filters=None):
|
def query(self, query=None, _composite_filters=None, allow_custom=False):
|
||||||
"""
|
"""
|
||||||
Implement:Implement the specific data source API calls, processing,
|
Implement:Implement the specific data source API calls, processing,
|
||||||
functionality required for retrieving query from the data source
|
functionality required for retrieving query from the data source
|
||||||
|
@ -187,9 +199,10 @@ class DataSource(object):
|
||||||
Args:
|
Args:
|
||||||
query (list): a list of filters (which collectively are the query)
|
query (list): a list of filters (which collectively are the query)
|
||||||
to conduct search on
|
to conduct search on
|
||||||
|
|
||||||
_composite_filters (set): a set of filters passed from the parent
|
_composite_filters (set): a set of filters passed from the parent
|
||||||
CompositeDataSource, not user supplied
|
CompositeDataSource, not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
stix_objs (list): a list of STIX objects
|
stix_objs (list): a list of STIX objects
|
||||||
|
@ -224,7 +237,7 @@ class CompositeDataSource(DataSource):
|
||||||
super(CompositeDataSource, self).__init__()
|
super(CompositeDataSource, self).__init__()
|
||||||
self.data_sources = []
|
self.data_sources = []
|
||||||
|
|
||||||
def get(self, stix_id, _composite_filters=None):
|
def get(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""Retrieve STIX object by STIX ID
|
"""Retrieve STIX object by STIX ID
|
||||||
|
|
||||||
Federated retrieve method, iterates through all DataSources
|
Federated retrieve method, iterates through all DataSources
|
||||||
|
@ -238,10 +251,11 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): the id of the STIX object to retrieve.
|
stix_id (str): the id of the STIX object to retrieve.
|
||||||
|
|
||||||
_composite_filters (list): a list of filters passed from a
|
_composite_filters (list): a list of filters passed from a
|
||||||
CompositeDataSource (i.e. if this CompositeDataSource is attached
|
CompositeDataSource (i.e. if this CompositeDataSource is attached
|
||||||
to another parent CompositeDataSource), not user supplied
|
to another parent CompositeDataSource), not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
stix_obj: the STIX object to be returned.
|
stix_obj: the STIX object to be returned.
|
||||||
|
@ -259,20 +273,22 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
# for every configured Data Source, call its retrieve handler
|
# for every configured Data Source, call its retrieve handler
|
||||||
for ds in self.data_sources:
|
for ds in self.data_sources:
|
||||||
data = ds.get(stix_id=stix_id, _composite_filters=all_filters)
|
data = ds.get(stix_id=stix_id, _composite_filters=all_filters, allow_custom=allow_custom)
|
||||||
if data:
|
if data:
|
||||||
all_data.append(data)
|
all_data.append(data)
|
||||||
|
|
||||||
# remove duplicate versions
|
# remove duplicate versions
|
||||||
if len(all_data) > 0:
|
if len(all_data) > 0:
|
||||||
all_data = deduplicate(all_data)
|
all_data = deduplicate(all_data)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
# reduce to most recent version
|
# reduce to most recent version
|
||||||
stix_obj = sorted(all_data, key=lambda k: k['modified'], reverse=True)[0]
|
stix_obj = sorted(all_data, key=lambda k: k['modified'], reverse=True)[0]
|
||||||
|
|
||||||
return stix_obj
|
return stix_obj
|
||||||
|
|
||||||
def all_versions(self, stix_id, _composite_filters=None):
|
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""Retrieve STIX objects by STIX ID
|
"""Retrieve STIX objects by STIX ID
|
||||||
|
|
||||||
Federated all_versions retrieve method - iterates through all DataSources
|
Federated all_versions retrieve method - iterates through all DataSources
|
||||||
|
@ -283,10 +299,11 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): id of the STIX objects to retrieve
|
stix_id (str): id of the STIX objects to retrieve
|
||||||
|
|
||||||
_composite_filters (list): a list of filters passed from a
|
_composite_filters (list): a list of filters passed from a
|
||||||
CompositeDataSource (i.e. if this CompositeDataSource is attached
|
CompositeDataSource (i.e. if this CompositeDataSource is attached
|
||||||
to a parent CompositeDataSource), not user supplied
|
to a parent CompositeDataSource), not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
all_data (list): list of STIX objects that have the specified id
|
all_data (list): list of STIX objects that have the specified id
|
||||||
|
@ -305,7 +322,7 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
# retrieve STIX objects from all configured data sources
|
# retrieve STIX objects from all configured data sources
|
||||||
for ds in self.data_sources:
|
for ds in self.data_sources:
|
||||||
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters)
|
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters, allow_custom=allow_custom)
|
||||||
all_data.extend(data)
|
all_data.extend(data)
|
||||||
|
|
||||||
# remove exact duplicates (where duplicates are STIX 2.0 objects
|
# remove exact duplicates (where duplicates are STIX 2.0 objects
|
||||||
|
@ -315,7 +332,7 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
return all_data
|
return all_data
|
||||||
|
|
||||||
def query(self, query=None, _composite_filters=None):
|
def query(self, query=None, _composite_filters=None, allow_custom=False):
|
||||||
"""Retrieve STIX objects that match query
|
"""Retrieve STIX objects that match query
|
||||||
|
|
||||||
Federate the query to all DataSources attached to the
|
Federate the query to all DataSources attached to the
|
||||||
|
@ -323,10 +340,11 @@ class CompositeDataSource(DataSource):
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
query (list): list of filters to search on
|
query (list): list of filters to search on
|
||||||
|
|
||||||
_composite_filters (list): a list of filters passed from a
|
_composite_filters (list): a list of filters passed from a
|
||||||
CompositeDataSource (i.e. if this CompositeDataSource is attached
|
CompositeDataSource (i.e. if this CompositeDataSource is attached
|
||||||
to a parent CompositeDataSource), not user supplied
|
to a parent CompositeDataSource), not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
all_data (list): list of STIX objects to be returned
|
all_data (list): list of STIX objects to be returned
|
||||||
|
@ -351,7 +369,7 @@ class CompositeDataSource(DataSource):
|
||||||
# federate query to all attached data sources,
|
# federate query to all attached data sources,
|
||||||
# pass composite filters to id
|
# pass composite filters to id
|
||||||
for ds in self.data_sources:
|
for ds in self.data_sources:
|
||||||
data = ds.query(query=query, _composite_filters=all_filters)
|
data = ds.query(query=query, _composite_filters=all_filters, allow_custom=allow_custom)
|
||||||
all_data.extend(data)
|
all_data.extend(data)
|
||||||
|
|
||||||
# remove exact duplicates (where duplicates are STIX 2.0
|
# remove exact duplicates (where duplicates are STIX 2.0
|
||||||
|
|
|
@ -8,51 +8,51 @@ TODO:
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from stix2.base import _STIXBase
|
|
||||||
from stix2.core import Bundle, parse
|
from stix2.core import Bundle, parse
|
||||||
from stix2.sources import DataSink, DataSource, DataStore
|
from stix2.sources import DataSink, DataSource, DataStore
|
||||||
from stix2.sources.filters import Filter, apply_common_filters
|
from stix2.sources.filters import Filter, apply_common_filters
|
||||||
from stix2.utils import deduplicate
|
from stix2.utils import deduplicate, get_class_hierarchy_names
|
||||||
|
|
||||||
|
|
||||||
class FileSystemStore(DataStore):
|
class FileSystemStore(DataStore):
|
||||||
"""FileSystemStore
|
"""Interface to a file directory of STIX objects.
|
||||||
|
|
||||||
Provides an interface to an file directory of STIX objects.
|
|
||||||
FileSystemStore is a wrapper around a paired FileSystemSink
|
FileSystemStore is a wrapper around a paired FileSystemSink
|
||||||
and FileSystemSource.
|
and FileSystemSource.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_dir (str): path to directory of STIX objects
|
stix_dir (str): path to directory of STIX objects
|
||||||
|
bundlify (bool): Whether to wrap objects in bundles when saving them.
|
||||||
|
Default: False.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
source (FileSystemSource): FuleSystemSource
|
source (FileSystemSource): FuleSystemSource
|
||||||
|
|
||||||
sink (FileSystemSink): FileSystemSink
|
sink (FileSystemSink): FileSystemSink
|
||||||
|
|
||||||
"""
|
"""
|
||||||
def __init__(self, stix_dir):
|
def __init__(self, stix_dir, bundlify=False):
|
||||||
super(FileSystemStore, self).__init__()
|
super(FileSystemStore, self).__init__()
|
||||||
self.source = FileSystemSource(stix_dir=stix_dir)
|
self.source = FileSystemSource(stix_dir=stix_dir)
|
||||||
self.sink = FileSystemSink(stix_dir=stix_dir)
|
self.sink = FileSystemSink(stix_dir=stix_dir, bundlify=bundlify)
|
||||||
|
|
||||||
|
|
||||||
class FileSystemSink(DataSink):
|
class FileSystemSink(DataSink):
|
||||||
"""FileSystemSink
|
"""Interface for adding/pushing STIX objects to file directory of STIX
|
||||||
|
objects.
|
||||||
Provides an interface for adding/pushing STIX objects
|
|
||||||
to file directory of STIX objects.
|
|
||||||
|
|
||||||
Can be paired with a FileSystemSource, together as the two
|
Can be paired with a FileSystemSource, together as the two
|
||||||
components of a FileSystemStore.
|
components of a FileSystemStore.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_dir (str): path to directory of STIX objects
|
stix_dir (str): path to directory of STIX objects.
|
||||||
|
bundlify (bool): Whether to wrap objects in bundles when saving them.
|
||||||
|
Default: False.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
def __init__(self, stix_dir):
|
def __init__(self, stix_dir, bundlify=False):
|
||||||
super(FileSystemSink, self).__init__()
|
super(FileSystemSink, self).__init__()
|
||||||
self._stix_dir = os.path.abspath(stix_dir)
|
self._stix_dir = os.path.abspath(stix_dir)
|
||||||
|
self.bundlify = bundlify
|
||||||
|
|
||||||
if not os.path.exists(self._stix_dir):
|
if not os.path.exists(self._stix_dir):
|
||||||
raise ValueError("directory path for STIX data does not exist")
|
raise ValueError("directory path for STIX data does not exist")
|
||||||
|
@ -61,66 +61,72 @@ class FileSystemSink(DataSink):
|
||||||
def stix_dir(self):
|
def stix_dir(self):
|
||||||
return self._stix_dir
|
return self._stix_dir
|
||||||
|
|
||||||
|
def _check_path_and_write(self, stix_obj):
|
||||||
|
"""Write the given STIX object to a file in the STIX file directory.
|
||||||
|
"""
|
||||||
|
path = os.path.join(self._stix_dir, stix_obj["type"], stix_obj["id"] + ".json")
|
||||||
|
|
||||||
|
if not os.path.exists(os.path.dirname(path)):
|
||||||
|
os.makedirs(os.path.dirname(path))
|
||||||
|
|
||||||
|
if self.bundlify:
|
||||||
|
stix_obj = Bundle(stix_obj)
|
||||||
|
|
||||||
|
with open(path, "w") as f:
|
||||||
|
f.write(str(stix_obj))
|
||||||
|
|
||||||
def add(self, stix_data=None, allow_custom=False, version=None):
|
def add(self, stix_data=None, allow_custom=False, version=None):
|
||||||
"""add STIX objects to file directory
|
"""Add STIX objects to file directory.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
|
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
|
||||||
in a STIX object(or list of), dict (or list of), or a STIX 2.0
|
in a STIX object (or list of), dict (or list of), or a STIX 2.0
|
||||||
json encoded string
|
json encoded string.
|
||||||
allow_custom (bool): Whether to allow custom properties or not.
|
allow_custom (bool): whether to allow custom objects/properties or
|
||||||
Default: False.
|
not. Default: False.
|
||||||
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
||||||
None, use latest version.
|
None, use latest version.
|
||||||
|
|
||||||
TODO: Bundlify STIX content or no? When dumping to disk.
|
Note:
|
||||||
|
``stix_data`` can be a Bundle object, but each object in it will be
|
||||||
|
saved separately; you will be able to retrieve any of the objects
|
||||||
|
the Bundle contained, but not the Bundle itself.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
def _check_path_and_write(stix_dir, stix_obj):
|
if any(x in ('STIXDomainObject', 'STIXRelationshipObject', 'MarkingDefinition')
|
||||||
path = os.path.join(stix_dir, stix_obj["type"], stix_obj["id"] + ".json")
|
for x in get_class_hierarchy_names(stix_data)):
|
||||||
|
|
||||||
if not os.path.exists(os.path.dirname(path)):
|
|
||||||
os.makedirs(os.path.dirname(path))
|
|
||||||
|
|
||||||
with open(path, "w") as f:
|
|
||||||
# Bundle() can take dict or STIX obj as argument
|
|
||||||
f.write(str(Bundle(stix_obj)))
|
|
||||||
|
|
||||||
if isinstance(stix_data, _STIXBase):
|
|
||||||
# adding python STIX object
|
# adding python STIX object
|
||||||
_check_path_and_write(self._stix_dir, stix_data)
|
self._check_path_and_write(stix_data)
|
||||||
|
|
||||||
elif isinstance(stix_data, dict):
|
elif isinstance(stix_data, (str, dict)):
|
||||||
|
stix_data = parse(stix_data, allow_custom, version)
|
||||||
if stix_data["type"] == "bundle":
|
if stix_data["type"] == "bundle":
|
||||||
# adding json-formatted Bundle - extracting STIX objects
|
# extract STIX objects
|
||||||
for stix_obj in stix_data["objects"]:
|
for stix_obj in stix_data.get("objects", []):
|
||||||
self.add(stix_obj)
|
self.add(stix_obj)
|
||||||
else:
|
else:
|
||||||
# adding json-formatted STIX
|
# adding json-formatted STIX
|
||||||
_check_path_and_write(self._stix_dir, stix_data)
|
self._check_path_and_write(stix_data)
|
||||||
|
|
||||||
elif isinstance(stix_data, str):
|
elif isinstance(stix_data, Bundle):
|
||||||
# adding json encoded string of STIX content
|
# recursively add individual STIX objects
|
||||||
stix_data = parse(stix_data, allow_custom, version)
|
for stix_obj in stix_data.get("objects", []):
|
||||||
if stix_data["type"] == "bundle":
|
self.add(stix_obj)
|
||||||
for stix_obj in stix_data["objects"]:
|
|
||||||
self.add(stix_obj)
|
|
||||||
else:
|
|
||||||
self.add(stix_data)
|
|
||||||
|
|
||||||
elif isinstance(stix_data, list):
|
elif isinstance(stix_data, list):
|
||||||
# if list, recurse call on individual STIX objects
|
# recursively add individual STIX objects
|
||||||
for stix_obj in stix_data:
|
for stix_obj in stix_data:
|
||||||
self.add(stix_obj)
|
self.add(stix_obj)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError("stix_data must be a STIX object(or list of), json formatted STIX(or list of) or a json formatted STIX bundle")
|
raise TypeError("stix_data must be a STIX object (or list of), "
|
||||||
|
"JSON formatted STIX (or list of), "
|
||||||
|
"or a JSON formatted STIX bundle")
|
||||||
|
|
||||||
|
|
||||||
class FileSystemSource(DataSource):
|
class FileSystemSource(DataSource):
|
||||||
"""FileSystemSource
|
"""Interface for searching/retrieving STIX objects from a STIX object file
|
||||||
|
directory.
|
||||||
Provides an interface for searching/retrieving
|
|
||||||
STIX objects from a STIX object file directory.
|
|
||||||
|
|
||||||
Can be paired with a FileSystemSink, together as the two
|
Can be paired with a FileSystemSink, together as the two
|
||||||
components of a FileSystemStore.
|
components of a FileSystemStore.
|
||||||
|
@ -141,15 +147,14 @@ class FileSystemSource(DataSource):
|
||||||
return self._stix_dir
|
return self._stix_dir
|
||||||
|
|
||||||
def get(self, stix_id, _composite_filters=None, allow_custom=False, version=None):
|
def get(self, stix_id, _composite_filters=None, allow_custom=False, version=None):
|
||||||
"""retrieve STIX object from file directory via STIX ID
|
"""Retrieve STIX object from file directory via STIX ID.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): The STIX ID of the STIX object to be retrieved.
|
stix_id (str): The STIX ID of the STIX object to be retrieved.
|
||||||
|
|
||||||
_composite_filters (set): set of filters passed from the parent
|
_composite_filters (set): set of filters passed from the parent
|
||||||
CompositeDataSource, not user supplied.
|
CompositeDataSource, not user supplied
|
||||||
allow_custom (bool): Whether to allow custom properties or not.
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
Default: False.
|
or not. Default: False.
|
||||||
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
||||||
None, use latest version.
|
None, use latest version.
|
||||||
|
|
||||||
|
@ -161,48 +166,53 @@ class FileSystemSource(DataSource):
|
||||||
"""
|
"""
|
||||||
query = [Filter("id", "=", stix_id)]
|
query = [Filter("id", "=", stix_id)]
|
||||||
|
|
||||||
all_data = self.query(query=query, _composite_filters=_composite_filters)
|
all_data = self.query(query=query, _composite_filters=_composite_filters,
|
||||||
|
allow_custom=allow_custom, version=version)
|
||||||
|
|
||||||
if all_data:
|
if all_data:
|
||||||
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
|
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
|
||||||
stix_obj = parse(stix_obj, allow_custom, version)
|
|
||||||
else:
|
else:
|
||||||
stix_obj = None
|
stix_obj = None
|
||||||
|
|
||||||
return stix_obj
|
return stix_obj
|
||||||
|
|
||||||
def all_versions(self, stix_id, _composite_filters=None):
|
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False, version=None):
|
||||||
"""retrieve STIX object from file directory via STIX ID, all versions
|
"""Retrieve STIX object from file directory via STIX ID, all versions.
|
||||||
|
|
||||||
Note: Since FileSystem sources/sinks don't handle multiple versions
|
Note: Since FileSystem sources/sinks don't handle multiple versions
|
||||||
of a STIX object, this operation is unnecessary. Pass call to get().
|
of a STIX object, this operation is unnecessary. Pass call to get().
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): The STIX ID of the STIX objects to be retrieved.
|
stix_id (str): The STIX ID of the STIX objects to be retrieved.
|
||||||
|
_composite_filters (set): set of filters passed from the parent
|
||||||
composite_filters (set): set of filters passed from the parent
|
|
||||||
CompositeDataSource, not user supplied
|
CompositeDataSource, not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
||||||
|
None, use latest version.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
(list): of STIX objects that has the supplied STIX ID.
|
(list): of STIX objects that has the supplied STIX ID.
|
||||||
The STIX objects are loaded from their json files, parsed into
|
The STIX objects are loaded from their json files, parsed into
|
||||||
a python STIX objects and then returned
|
a python STIX objects and then returned
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
|
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters,
|
||||||
|
allow_custom=allow_custom, version=version)]
|
||||||
|
|
||||||
def query(self, query=None, _composite_filters=None, allow_custom=False, version=None):
|
def query(self, query=None, _composite_filters=None, allow_custom=False, version=None):
|
||||||
"""search and retrieve STIX objects based on the complete query
|
"""Search and retrieve STIX objects based on the complete query.
|
||||||
|
|
||||||
A "complete query" includes the filters from the query, the filters
|
A "complete query" includes the filters from the query, the filters
|
||||||
attached to MemorySource, and any filters passed from a
|
attached to this FileSystemSource, and any filters passed from a
|
||||||
CompositeDataSource (i.e. _composite_filters)
|
CompositeDataSource (i.e. _composite_filters).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
query (list): list of filters to search on.
|
query (list): list of filters to search on
|
||||||
_composite_filters (set): set of filters passed from the
|
_composite_filters (set): set of filters passed from the
|
||||||
CompositeDataSource, not user supplied.
|
CompositeDataSource, not user supplied
|
||||||
allow_custom (bool): Whether to allow custom properties or not.
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
Default: False.
|
or not. Default: False.
|
||||||
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
version (str): Which STIX2 version to use. (e.g. "2.0", "2.1"). If
|
||||||
None, use latest version.
|
None, use latest version.
|
||||||
|
|
||||||
|
@ -220,7 +230,7 @@ class FileSystemSource(DataSource):
|
||||||
if not isinstance(query, list):
|
if not isinstance(query, list):
|
||||||
# make sure dont make set from a Filter object,
|
# make sure dont make set from a Filter object,
|
||||||
# need to make a set from a list of Filter objects (even if just one Filter)
|
# need to make a set from a list of Filter objects (even if just one Filter)
|
||||||
query = list(query)
|
query = [query]
|
||||||
query = set(query)
|
query = set(query)
|
||||||
|
|
||||||
# combine all query filters
|
# combine all query filters
|
||||||
|
@ -265,8 +275,8 @@ class FileSystemSource(DataSource):
|
||||||
# so query will look in all STIX directories that are not
|
# so query will look in all STIX directories that are not
|
||||||
# the specified type. Compile correct dir paths
|
# the specified type. Compile correct dir paths
|
||||||
for dir in os.listdir(self._stix_dir):
|
for dir in os.listdir(self._stix_dir):
|
||||||
if os.path.abspath(dir) not in declude_paths:
|
if os.path.abspath(os.path.join(self._stix_dir, dir)) not in declude_paths:
|
||||||
include_paths.append(os.path.abspath(dir))
|
include_paths.append(os.path.abspath(os.path.join(self._stix_dir, dir)))
|
||||||
|
|
||||||
# grab stix object ID as well - if present in filters, as
|
# grab stix object ID as well - if present in filters, as
|
||||||
# may forgo the loading of STIX content into memory
|
# may forgo the loading of STIX content into memory
|
||||||
|
@ -284,15 +294,12 @@ class FileSystemSource(DataSource):
|
||||||
for path in include_paths:
|
for path in include_paths:
|
||||||
for root, dirs, files in os.walk(path):
|
for root, dirs, files in os.walk(path):
|
||||||
for file_ in files:
|
for file_ in files:
|
||||||
if id_:
|
if not id_ or id_ == file_.split(".")[0]:
|
||||||
if id_ == file_.split(".")[0]:
|
|
||||||
# since ID is specified in one of filters, can evaluate against filename first without loading
|
|
||||||
stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0]
|
|
||||||
# check against other filters, add if match
|
|
||||||
all_data.extend(apply_common_filters([stix_obj], query))
|
|
||||||
else:
|
|
||||||
# have to load into memory regardless to evaluate other filters
|
# have to load into memory regardless to evaluate other filters
|
||||||
stix_obj = json.load(open(os.path.join(root, file_)))["objects"][0]
|
stix_obj = json.load(open(os.path.join(root, file_)))
|
||||||
|
if stix_obj.get('type', '') == 'bundle':
|
||||||
|
stix_obj = stix_obj['objects'][0]
|
||||||
|
# check against other filters, add if match
|
||||||
all_data.extend(apply_common_filters([stix_obj], query))
|
all_data.extend(apply_common_filters([stix_obj], query))
|
||||||
|
|
||||||
all_data = deduplicate(all_data)
|
all_data = deduplicate(all_data)
|
||||||
|
@ -303,15 +310,16 @@ class FileSystemSource(DataSource):
|
||||||
return stix_objs
|
return stix_objs
|
||||||
|
|
||||||
def _parse_file_filters(self, query):
|
def _parse_file_filters(self, query):
|
||||||
"""utility method to extract STIX common filters
|
"""Extract STIX common filters.
|
||||||
that can used to possibly speed up querying STIX objects
|
|
||||||
from the file system
|
Possibly speeds up querying STIX objects from the file system.
|
||||||
|
|
||||||
Extracts filters that are for the "id" and "type" field of
|
Extracts filters that are for the "id" and "type" field of
|
||||||
a STIX object. As the file directory is organized by STIX
|
a STIX object. As the file directory is organized by STIX
|
||||||
object type with filenames that are equivalent to the STIX
|
object type with filenames that are equivalent to the STIX
|
||||||
object ID, these filters can be used first to reduce the
|
object ID, these filters can be used first to reduce the
|
||||||
search space of a FileSystemStore(or FileSystemSink)
|
search space of a FileSystemStore (or FileSystemSink).
|
||||||
|
|
||||||
"""
|
"""
|
||||||
file_filters = set()
|
file_filters = set()
|
||||||
for filter_ in query:
|
for filter_ in query:
|
||||||
|
|
|
@ -24,16 +24,18 @@ from stix2.sources import DataSink, DataSource, DataStore
|
||||||
from stix2.sources.filters import Filter, apply_common_filters
|
from stix2.sources.filters import Filter, apply_common_filters
|
||||||
|
|
||||||
|
|
||||||
def _add(store, stix_data=None):
|
def _add(store, stix_data=None, allow_custom=False):
|
||||||
"""Adds STIX objects to MemoryStore/Sink.
|
"""Add STIX objects to MemoryStore/Sink.
|
||||||
|
|
||||||
Adds STIX objects to an in-memory dictionary for fast lookup.
|
Adds STIX objects to an in-memory dictionary for fast lookup.
|
||||||
Recursive function, breaks down STIX Bundles and lists.
|
Recursive function, breaks down STIX Bundles and lists.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_data (list OR dict OR STIX object): STIX objects to be added
|
stix_data (list OR dict OR STIX object): STIX objects to be added
|
||||||
"""
|
allow_custom (bool): whether to allow custom objects/properties or
|
||||||
|
not. Default: False.
|
||||||
|
|
||||||
|
"""
|
||||||
if isinstance(stix_data, _STIXBase):
|
if isinstance(stix_data, _STIXBase):
|
||||||
# adding a python STIX object
|
# adding a python STIX object
|
||||||
store._data[stix_data["id"]] = stix_data
|
store._data[stix_data["id"]] = stix_data
|
||||||
|
@ -41,35 +43,35 @@ def _add(store, stix_data=None):
|
||||||
elif isinstance(stix_data, dict):
|
elif isinstance(stix_data, dict):
|
||||||
if stix_data["type"] == "bundle":
|
if stix_data["type"] == "bundle":
|
||||||
# adding a json bundle - so just grab STIX objects
|
# adding a json bundle - so just grab STIX objects
|
||||||
for stix_obj in stix_data["objects"]:
|
for stix_obj in stix_data.get("objects", []):
|
||||||
_add(store, stix_obj)
|
_add(store, stix_obj, allow_custom=allow_custom)
|
||||||
else:
|
else:
|
||||||
# adding a json STIX object
|
# adding a json STIX object
|
||||||
store._data[stix_data["id"]] = stix_data
|
store._data[stix_data["id"]] = stix_data
|
||||||
|
|
||||||
elif isinstance(stix_data, str):
|
elif isinstance(stix_data, str):
|
||||||
# adding json encoded string of STIX content
|
# adding json encoded string of STIX content
|
||||||
stix_data = parse(stix_data)
|
stix_data = parse(stix_data, allow_custom=allow_custom)
|
||||||
if stix_data["type"] == "bundle":
|
if stix_data["type"] == "bundle":
|
||||||
# recurse on each STIX object in bundle
|
# recurse on each STIX object in bundle
|
||||||
for stix_obj in stix_data:
|
for stix_obj in stix_data.get("objects", []):
|
||||||
_add(store, stix_obj)
|
_add(store, stix_obj, allow_custom=allow_custom)
|
||||||
else:
|
else:
|
||||||
_add(store, stix_data)
|
_add(store, stix_data)
|
||||||
|
|
||||||
elif isinstance(stix_data, list):
|
elif isinstance(stix_data, list):
|
||||||
# STIX objects are in a list- recurse on each object
|
# STIX objects are in a list- recurse on each object
|
||||||
for stix_obj in stix_data:
|
for stix_obj in stix_data:
|
||||||
_add(store, stix_obj)
|
_add(store, stix_obj, allow_custom=allow_custom)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")
|
raise TypeError("stix_data must be a STIX object (or list of), JSON formatted STIX (or list of), or a JSON formatted STIX bundle")
|
||||||
|
|
||||||
|
|
||||||
class MemoryStore(DataStore):
|
class MemoryStore(DataStore):
|
||||||
"""Provides an interface to an in-memory dictionary
|
"""Interface to an in-memory dictionary of STIX objects.
|
||||||
of STIX objects. MemoryStore is a wrapper around a paired
|
|
||||||
MemorySink and MemorySource
|
MemoryStore is a wrapper around a paired MemorySink and MemorySource.
|
||||||
|
|
||||||
Note: It doesn't make sense to create a MemoryStore by passing
|
Note: It doesn't make sense to create a MemoryStore by passing
|
||||||
in existing MemorySource and MemorySink because there could
|
in existing MemorySource and MemorySink because there could
|
||||||
|
@ -77,36 +79,54 @@ class MemoryStore(DataStore):
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_data (list OR dict OR STIX object): STIX content to be added
|
stix_data (list OR dict OR STIX object): STIX content to be added
|
||||||
|
allow_custom (bool): whether to allow custom objects/properties or
|
||||||
|
not. Default: False.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
_data (dict): the in-memory dict that holds STIX objects
|
_data (dict): the in-memory dict that holds STIX objects
|
||||||
|
|
||||||
source (MemorySource): MemorySource
|
source (MemorySource): MemorySource
|
||||||
|
|
||||||
sink (MemorySink): MemorySink
|
sink (MemorySink): MemorySink
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
def __init__(self, stix_data=None, allow_custom=False):
|
||||||
def __init__(self, stix_data=None):
|
|
||||||
super(MemoryStore, self).__init__()
|
super(MemoryStore, self).__init__()
|
||||||
self._data = {}
|
self._data = {}
|
||||||
|
|
||||||
if stix_data:
|
if stix_data:
|
||||||
_add(self, stix_data)
|
_add(self, stix_data, allow_custom=allow_custom)
|
||||||
|
|
||||||
self.source = MemorySource(stix_data=self._data, _store=True)
|
self.source = MemorySource(stix_data=self._data, _store=True, allow_custom=allow_custom)
|
||||||
self.sink = MemorySink(stix_data=self._data, _store=True)
|
self.sink = MemorySink(stix_data=self._data, _store=True, allow_custom=allow_custom)
|
||||||
|
|
||||||
def save_to_file(self, file_path):
|
def save_to_file(self, file_path, allow_custom=False):
|
||||||
return self.sink.save_to_file(file_path=file_path)
|
"""Write SITX objects from in-memory dictionary to JSON file, as a STIX
|
||||||
|
Bundle.
|
||||||
|
|
||||||
def load_from_file(self, file_path):
|
Args:
|
||||||
return self.source.load_from_file(file_path=file_path)
|
file_path (str): file path to write STIX data to
|
||||||
|
allow_custom (bool): whether to allow custom objects/properties or
|
||||||
|
not. Default: False.
|
||||||
|
|
||||||
|
"""
|
||||||
|
return self.sink.save_to_file(file_path=file_path, allow_custom=allow_custom)
|
||||||
|
|
||||||
|
def load_from_file(self, file_path, allow_custom=False):
|
||||||
|
"""Load STIX data from JSON file.
|
||||||
|
|
||||||
|
File format is expected to be a single JSON
|
||||||
|
STIX object or JSON STIX bundle.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path (str): file path to load STIX data from
|
||||||
|
allow_custom (bool): whether to allow custom objects/properties or
|
||||||
|
not. Default: False.
|
||||||
|
|
||||||
|
"""
|
||||||
|
return self.source.load_from_file(file_path=file_path, allow_custom=allow_custom)
|
||||||
|
|
||||||
|
|
||||||
class MemorySink(DataSink):
|
class MemorySink(DataSink):
|
||||||
"""Provides an interface for adding/pushing STIX objects
|
"""Interface for adding/pushing STIX objects to an in-memory dictionary.
|
||||||
to an in-memory dictionary.
|
|
||||||
|
|
||||||
Designed to be paired with a MemorySource, together as the two
|
Designed to be paired with a MemorySource, together as the two
|
||||||
components of a MemoryStore.
|
components of a MemoryStore.
|
||||||
|
@ -114,51 +134,43 @@ class MemorySink(DataSink):
|
||||||
Args:
|
Args:
|
||||||
stix_data (dict OR list): valid STIX 2.0 content in
|
stix_data (dict OR list): valid STIX 2.0 content in
|
||||||
bundle or a list.
|
bundle or a list.
|
||||||
|
|
||||||
_store (bool): if the MemorySink is a part of a DataStore,
|
_store (bool): if the MemorySink is a part of a DataStore,
|
||||||
in which case "stix_data" is a direct reference to
|
in which case "stix_data" is a direct reference to
|
||||||
shared memory with DataSource. Not user supplied
|
shared memory with DataSource. Not user supplied
|
||||||
|
allow_custom (bool): whether to allow custom objects/properties or
|
||||||
|
not. Default: False.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
_data (dict): the in-memory dict that holds STIX objects.
|
_data (dict): the in-memory dict that holds STIX objects.
|
||||||
If apart of a MemoryStore, dict is shared between with
|
If apart of a MemoryStore, dict is shared between with
|
||||||
a MemorySource
|
a MemorySource
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, stix_data=None, _store=False):
|
"""
|
||||||
|
def __init__(self, stix_data=None, _store=False, allow_custom=False):
|
||||||
super(MemorySink, self).__init__()
|
super(MemorySink, self).__init__()
|
||||||
self._data = {}
|
self._data = {}
|
||||||
|
|
||||||
if _store:
|
if _store:
|
||||||
self._data = stix_data
|
self._data = stix_data
|
||||||
elif stix_data:
|
elif stix_data:
|
||||||
_add(self, stix_data)
|
_add(self, stix_data, allow_custom=allow_custom)
|
||||||
|
|
||||||
def add(self, stix_data):
|
def add(self, stix_data, allow_custom=False):
|
||||||
"""add STIX objects to in-memory dictionary maintained by
|
_add(self, stix_data, allow_custom=allow_custom)
|
||||||
the MemorySink (MemoryStore)
|
add.__doc__ = _add.__doc__
|
||||||
|
|
||||||
see "_add()" for args documentation
|
def save_to_file(self, file_path, allow_custom=False):
|
||||||
"""
|
|
||||||
_add(self, stix_data)
|
|
||||||
|
|
||||||
def save_to_file(self, file_path):
|
|
||||||
"""write SITX objects in in-memory dictionary to json file, as a STIX Bundle
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path (str): file path to write STIX data to
|
|
||||||
|
|
||||||
"""
|
|
||||||
file_path = os.path.abspath(file_path)
|
file_path = os.path.abspath(file_path)
|
||||||
if not os.path.exists(os.path.dirname(file_path)):
|
if not os.path.exists(os.path.dirname(file_path)):
|
||||||
os.makedirs(os.path.dirname(file_path))
|
os.makedirs(os.path.dirname(file_path))
|
||||||
with open(file_path, "w") as f:
|
with open(file_path, "w") as f:
|
||||||
f.write(str(Bundle(self._data.values())))
|
f.write(str(Bundle(self._data.values(), allow_custom=allow_custom)))
|
||||||
|
save_to_file.__doc__ = MemoryStore.save_to_file.__doc__
|
||||||
|
|
||||||
|
|
||||||
class MemorySource(DataSource):
|
class MemorySource(DataSource):
|
||||||
"""Provides an interface for searching/retrieving
|
"""Interface for searching/retrieving STIX objects from an in-memory
|
||||||
STIX objects from an in-memory dictionary.
|
dictionary.
|
||||||
|
|
||||||
Designed to be paired with a MemorySink, together as the two
|
Designed to be paired with a MemorySink, together as the two
|
||||||
components of a MemoryStore.
|
components of a MemoryStore.
|
||||||
|
@ -166,42 +178,44 @@ class MemorySource(DataSource):
|
||||||
Args:
|
Args:
|
||||||
stix_data (dict OR list OR STIX object): valid STIX 2.0 content in
|
stix_data (dict OR list OR STIX object): valid STIX 2.0 content in
|
||||||
bundle or list.
|
bundle or list.
|
||||||
|
|
||||||
_store (bool): if the MemorySource is a part of a DataStore,
|
_store (bool): if the MemorySource is a part of a DataStore,
|
||||||
in which case "stix_data" is a direct reference to shared
|
in which case "stix_data" is a direct reference to shared
|
||||||
memory with DataSink. Not user supplied
|
memory with DataSink. Not user supplied
|
||||||
|
allow_custom (bool): whether to allow custom objects/properties or
|
||||||
|
not. Default: False.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
_data (dict): the in-memory dict that holds STIX objects.
|
_data (dict): the in-memory dict that holds STIX objects.
|
||||||
If apart of a MemoryStore, dict is shared between with
|
If apart of a MemoryStore, dict is shared between with
|
||||||
a MemorySink
|
a MemorySink
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, stix_data=None, _store=False):
|
"""
|
||||||
|
def __init__(self, stix_data=None, _store=False, allow_custom=False):
|
||||||
super(MemorySource, self).__init__()
|
super(MemorySource, self).__init__()
|
||||||
self._data = {}
|
self._data = {}
|
||||||
|
|
||||||
if _store:
|
if _store:
|
||||||
self._data = stix_data
|
self._data = stix_data
|
||||||
elif stix_data:
|
elif stix_data:
|
||||||
_add(self, stix_data)
|
_add(self, stix_data, allow_custom=allow_custom)
|
||||||
|
|
||||||
def get(self, stix_id, _composite_filters=None):
|
def get(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""retrieve STIX object from in-memory dict via STIX ID
|
"""Retrieve STIX object from in-memory dict via STIX ID.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): The STIX ID of the STIX object to be retrieved.
|
stix_id (str): The STIX ID of the STIX object to be retrieved.
|
||||||
|
|
||||||
composite_filters (set): set of filters passed from the parent
|
composite_filters (set): set of filters passed from the parent
|
||||||
CompositeDataSource, not user supplied
|
CompositeDataSource, not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
(dict OR STIX object): STIX object that has the supplied
|
(dict OR STIX object): STIX object that has the supplied
|
||||||
ID. As the MemoryStore(i.e. MemorySink) adds STIX objects to memory
|
ID. As the MemoryStore(i.e. MemorySink) adds STIX objects to memory
|
||||||
as they are supplied (either as python dictionary or STIX object), it
|
as they are supplied (either as python dictionary or STIX object), it
|
||||||
is returned in the same form as it as added
|
is returned in the same form as it as added
|
||||||
"""
|
|
||||||
|
|
||||||
|
"""
|
||||||
if _composite_filters is None:
|
if _composite_filters is None:
|
||||||
# if get call is only based on 'id', no need to search, just retrieve from dict
|
# if get call is only based on 'id', no need to search, just retrieve from dict
|
||||||
try:
|
try:
|
||||||
|
@ -213,24 +227,28 @@ class MemorySource(DataSource):
|
||||||
# if there are filters from the composite level, process full query
|
# if there are filters from the composite level, process full query
|
||||||
query = [Filter("id", "=", stix_id)]
|
query = [Filter("id", "=", stix_id)]
|
||||||
|
|
||||||
all_data = self.query(query=query, _composite_filters=_composite_filters)
|
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
|
||||||
|
|
||||||
# reduce to most recent version
|
if all_data:
|
||||||
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
|
# reduce to most recent version
|
||||||
|
stix_obj = sorted(all_data, key=lambda k: k['modified'])[0]
|
||||||
|
|
||||||
return stix_obj
|
return stix_obj
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def all_versions(self, stix_id, _composite_filters=None):
|
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""retrieve STIX objects from in-memory dict via STIX ID, all versions of it
|
"""Retrieve STIX objects from in-memory dict via STIX ID, all versions of it
|
||||||
|
|
||||||
Note: Since Memory sources/sinks don't handle multiple versions of a
|
Note: Since Memory sources/sinks don't handle multiple versions of a
|
||||||
STIX object, this operation is unnecessary. Translate call to get().
|
STIX object, this operation is unnecessary. Translate call to get().
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): The STIX ID of the STIX 2 object to retrieve.
|
stix_id (str): The STIX ID of the STIX 2 object to retrieve.
|
||||||
|
|
||||||
composite_filters (set): set of filters passed from the parent
|
composite_filters (set): set of filters passed from the parent
|
||||||
CompositeDataSource, not user supplied
|
CompositeDataSource, not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
(list): list of STIX objects that has the supplied ID. As the
|
(list): list of STIX objects that has the supplied ID. As the
|
||||||
|
@ -239,26 +257,27 @@ class MemorySource(DataSource):
|
||||||
is returned in the same form as it as added
|
is returned in the same form as it as added
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters)]
|
return [self.get(stix_id=stix_id, _composite_filters=_composite_filters, allow_custom=allow_custom)]
|
||||||
|
|
||||||
def query(self, query=None, _composite_filters=None):
|
def query(self, query=None, _composite_filters=None, allow_custom=False):
|
||||||
"""search and retrieve STIX objects based on the complete query
|
"""Search and retrieve STIX objects based on the complete query.
|
||||||
|
|
||||||
A "complete query" includes the filters from the query, the filters
|
A "complete query" includes the filters from the query, the filters
|
||||||
attached to MemorySource, and any filters passed from a
|
attached to this MemorySource, and any filters passed from a
|
||||||
CompositeDataSource (i.e. _composite_filters)
|
CompositeDataSource (i.e. _composite_filters).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
query (list): list of filters to search on
|
query (list): list of filters to search on
|
||||||
|
|
||||||
composite_filters (set): set of filters passed from the
|
composite_filters (set): set of filters passed from the
|
||||||
CompositeDataSource, not user supplied
|
CompositeDataSource, not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
(list): list of STIX objects that matches the supplied
|
(list): list of STIX objects that matches the supplied
|
||||||
query. As the MemoryStore(i.e. MemorySink) adds STIX objects to memory
|
query. As the MemoryStore(i.e. MemorySink) adds STIX objects to memory
|
||||||
as they are supplied (either as python dictionary or STIX object), it
|
as they are supplied (either as python dictionary or STIX object), it
|
||||||
is returned in the same form as it as added
|
is returned in the same form as it as added.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if query is None:
|
if query is None:
|
||||||
|
@ -267,7 +286,7 @@ class MemorySource(DataSource):
|
||||||
if not isinstance(query, list):
|
if not isinstance(query, list):
|
||||||
# make sure dont make set from a Filter object,
|
# make sure dont make set from a Filter object,
|
||||||
# need to make a set from a list of Filter objects (even if just one Filter)
|
# need to make a set from a list of Filter objects (even if just one Filter)
|
||||||
query = list(query)
|
query = [query]
|
||||||
query = set(query)
|
query = set(query)
|
||||||
|
|
||||||
# combine all query filters
|
# combine all query filters
|
||||||
|
@ -281,15 +300,8 @@ class MemorySource(DataSource):
|
||||||
|
|
||||||
return all_data
|
return all_data
|
||||||
|
|
||||||
def load_from_file(self, file_path):
|
def load_from_file(self, file_path, allow_custom=False):
|
||||||
"""load STIX data from json file
|
|
||||||
|
|
||||||
File format is expected to be a single json
|
|
||||||
STIX object or json STIX bundle
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path (str): file path to load STIX data from
|
|
||||||
"""
|
|
||||||
file_path = os.path.abspath(file_path)
|
file_path = os.path.abspath(file_path)
|
||||||
stix_data = json.load(open(file_path, "r"))
|
stix_data = json.load(open(file_path, "r"))
|
||||||
_add(self, stix_data)
|
_add(self, stix_data, allow_custom=allow_custom)
|
||||||
|
load_from_file.__doc__ = MemoryStore.load_from_file.__doc__
|
||||||
|
|
|
@ -1,9 +1,5 @@
|
||||||
"""
|
"""
|
||||||
Python STIX 2.0 TAXII Source/Sink
|
Python STIX 2.x TaxiiCollectionStore
|
||||||
|
|
||||||
TODO:
|
|
||||||
Test everything
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from stix2.base import _STIXBase
|
from stix2.base import _STIXBase
|
||||||
|
@ -21,7 +17,7 @@ class TAXIICollectionStore(DataStore):
|
||||||
around a paired TAXIICollectionSink and TAXIICollectionSource.
|
around a paired TAXIICollectionSink and TAXIICollectionSource.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
collection (taxii2.Collection): TAXII Collection instance
|
collection (taxii2.Collection): TAXII Collection instance
|
||||||
"""
|
"""
|
||||||
def __init__(self, collection):
|
def __init__(self, collection):
|
||||||
super(TAXIICollectionStore, self).__init__()
|
super(TAXIICollectionStore, self).__init__()
|
||||||
|
@ -41,39 +37,40 @@ class TAXIICollectionSink(DataSink):
|
||||||
super(TAXIICollectionSink, self).__init__()
|
super(TAXIICollectionSink, self).__init__()
|
||||||
self.collection = collection
|
self.collection = collection
|
||||||
|
|
||||||
def add(self, stix_data):
|
def add(self, stix_data, allow_custom=False):
|
||||||
"""add/push STIX content to TAXII Collection endpoint
|
"""Add/push STIX content to TAXII Collection endpoint
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
|
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
|
||||||
in a STIX object (or Bundle), STIX onject dict (or Bundle dict), or a STIX 2.0
|
in a STIX object (or Bundle), STIX onject dict (or Bundle dict), or a STIX 2.0
|
||||||
json encoded string, or list of any of the following
|
json encoded string, or list of any of the following
|
||||||
|
allow_custom (bool): whether to allow custom objects/properties or
|
||||||
|
not. Default: False.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if isinstance(stix_data, _STIXBase):
|
if isinstance(stix_data, _STIXBase):
|
||||||
# adding python STIX object
|
# adding python STIX object
|
||||||
bundle = dict(Bundle(stix_data))
|
bundle = dict(Bundle(stix_data, allow_custom=allow_custom))
|
||||||
|
|
||||||
elif isinstance(stix_data, dict):
|
elif isinstance(stix_data, dict):
|
||||||
# adding python dict (of either Bundle or STIX obj)
|
# adding python dict (of either Bundle or STIX obj)
|
||||||
if stix_data["type"] == "bundle":
|
if stix_data["type"] == "bundle":
|
||||||
bundle = stix_data
|
bundle = stix_data
|
||||||
else:
|
else:
|
||||||
bundle = dict(Bundle(stix_data))
|
bundle = dict(Bundle(stix_data, allow_custom=allow_custom))
|
||||||
|
|
||||||
elif isinstance(stix_data, list):
|
elif isinstance(stix_data, list):
|
||||||
# adding list of something - recurse on each
|
# adding list of something - recurse on each
|
||||||
for obj in stix_data:
|
for obj in stix_data:
|
||||||
self.add(obj)
|
self.add(obj, allow_custom=allow_custom)
|
||||||
|
|
||||||
elif isinstance(stix_data, str):
|
elif isinstance(stix_data, str):
|
||||||
# adding json encoded string of STIX content
|
# adding json encoded string of STIX content
|
||||||
stix_data = parse(stix_data)
|
stix_data = parse(stix_data, allow_custom=allow_custom)
|
||||||
if stix_data["type"] == "bundle":
|
if stix_data["type"] == "bundle":
|
||||||
bundle = dict(stix_data)
|
bundle = dict(stix_data)
|
||||||
else:
|
else:
|
||||||
bundle = dict(Bundle(stix_data))
|
bundle = dict(Bundle(stix_data, allow_custom=allow_custom))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")
|
raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")
|
||||||
|
@ -93,22 +90,22 @@ class TAXIICollectionSource(DataSource):
|
||||||
super(TAXIICollectionSource, self).__init__()
|
super(TAXIICollectionSource, self).__init__()
|
||||||
self.collection = collection
|
self.collection = collection
|
||||||
|
|
||||||
def get(self, stix_id, _composite_filters=None):
|
def get(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""retrieve STIX object from local/remote STIX Collection
|
"""Retrieve STIX object from local/remote STIX Collection
|
||||||
endpoint.
|
endpoint.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): The STIX ID of the STIX object to be retrieved.
|
stix_id (str): The STIX ID of the STIX object to be retrieved.
|
||||||
|
|
||||||
composite_filters (set): set of filters passed from the parent
|
composite_filters (set): set of filters passed from the parent
|
||||||
CompositeDataSource, not user supplied
|
CompositeDataSource, not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
(STIX object): STIX object that has the supplied STIX ID.
|
(STIX object): STIX object that has the supplied STIX ID.
|
||||||
The STIX object is received from TAXII has dict, parsed into
|
The STIX object is received from TAXII has dict, parsed into
|
||||||
a python STIX object and then returned
|
a python STIX object and then returned
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# combine all query filters
|
# combine all query filters
|
||||||
query = set()
|
query = set()
|
||||||
|
@ -124,22 +121,25 @@ class TAXIICollectionSource(DataSource):
|
||||||
stix_obj = list(apply_common_filters(stix_objs, query))
|
stix_obj = list(apply_common_filters(stix_objs, query))
|
||||||
|
|
||||||
if len(stix_obj):
|
if len(stix_obj):
|
||||||
stix_obj = stix_obj[0]
|
stix_obj = parse(stix_obj[0], allow_custom=allow_custom)
|
||||||
stix_obj = parse(stix_obj)
|
if stix_obj.id != stix_id:
|
||||||
|
# check - was added to handle erroneous TAXII servers
|
||||||
|
stix_obj = None
|
||||||
else:
|
else:
|
||||||
stix_obj = None
|
stix_obj = None
|
||||||
|
|
||||||
return stix_obj
|
return stix_obj
|
||||||
|
|
||||||
def all_versions(self, stix_id, _composite_filters=None):
|
def all_versions(self, stix_id, _composite_filters=None, allow_custom=False):
|
||||||
"""retrieve STIX object from local/remote TAXII Collection
|
"""Retrieve STIX object from local/remote TAXII Collection
|
||||||
endpoint, all versions of it
|
endpoint, all versions of it
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_id (str): The STIX ID of the STIX objects to be retrieved.
|
stix_id (str): The STIX ID of the STIX objects to be retrieved.
|
||||||
|
|
||||||
composite_filters (set): set of filters passed from the parent
|
composite_filters (set): set of filters passed from the parent
|
||||||
CompositeDataSource, not user supplied
|
CompositeDataSource, not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
(see query() as all_versions() is just a wrapper)
|
(see query() as all_versions() is just a wrapper)
|
||||||
|
@ -151,12 +151,18 @@ class TAXIICollectionSource(DataSource):
|
||||||
Filter("match[version]", "=", "all")
|
Filter("match[version]", "=", "all")
|
||||||
]
|
]
|
||||||
|
|
||||||
all_data = self.query(query=query, _composite_filters=_composite_filters)
|
all_data = self.query(query=query, _composite_filters=_composite_filters, allow_custom=allow_custom)
|
||||||
|
|
||||||
return all_data
|
# parse STIX objects from TAXII returned json
|
||||||
|
all_data = [parse(stix_obj) for stix_obj in all_data]
|
||||||
|
|
||||||
def query(self, query=None, _composite_filters=None):
|
# check - was added to handle erroneous TAXII servers
|
||||||
"""search and retreive STIX objects based on the complete query
|
all_data_clean = [stix_obj for stix_obj in all_data if stix_obj.id == stix_id]
|
||||||
|
|
||||||
|
return all_data_clean
|
||||||
|
|
||||||
|
def query(self, query=None, _composite_filters=None, allow_custom=False):
|
||||||
|
"""Search and retreive STIX objects based on the complete query
|
||||||
|
|
||||||
A "complete query" includes the filters from the query, the filters
|
A "complete query" includes the filters from the query, the filters
|
||||||
attached to MemorySource, and any filters passed from a
|
attached to MemorySource, and any filters passed from a
|
||||||
|
@ -164,9 +170,10 @@ class TAXIICollectionSource(DataSource):
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
query (list): list of filters to search on
|
query (list): list of filters to search on
|
||||||
|
|
||||||
composite_filters (set): set of filters passed from the
|
composite_filters (set): set of filters passed from the
|
||||||
CompositeDataSource, not user supplied
|
CompositeDataSource, not user supplied
|
||||||
|
allow_custom (bool): whether to retrieve custom objects/properties
|
||||||
|
or not. Default: False.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
(list): list of STIX objects that matches the supplied
|
(list): list of STIX objects that matches the supplied
|
||||||
|
@ -174,14 +181,13 @@ class TAXIICollectionSource(DataSource):
|
||||||
parsed into python STIX objects and then returned.
|
parsed into python STIX objects and then returned.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if query is None:
|
if query is None:
|
||||||
query = set()
|
query = set()
|
||||||
else:
|
else:
|
||||||
if not isinstance(query, list):
|
if not isinstance(query, list):
|
||||||
# make sure dont make set from a Filter object,
|
# make sure dont make set from a Filter object,
|
||||||
# need to make a set from a list of Filter objects (even if just one Filter)
|
# need to make a set from a list of Filter objects (even if just one Filter)
|
||||||
query = list(query)
|
query = [query]
|
||||||
query = set(query)
|
query = set(query)
|
||||||
|
|
||||||
# combine all query filters
|
# combine all query filters
|
||||||
|
@ -194,7 +200,7 @@ class TAXIICollectionSource(DataSource):
|
||||||
taxii_filters = self._parse_taxii_filters(query)
|
taxii_filters = self._parse_taxii_filters(query)
|
||||||
|
|
||||||
# query TAXII collection
|
# query TAXII collection
|
||||||
all_data = self.collection.get_objects(filters=taxii_filters)["objects"]
|
all_data = self.collection.get_objects(filters=taxii_filters, allow_custom=allow_custom)["objects"]
|
||||||
|
|
||||||
# deduplicate data (before filtering as reduces wasted filtering)
|
# deduplicate data (before filtering as reduces wasted filtering)
|
||||||
all_data = deduplicate(all_data)
|
all_data = deduplicate(all_data)
|
||||||
|
@ -203,7 +209,7 @@ class TAXIICollectionSource(DataSource):
|
||||||
all_data = list(apply_common_filters(all_data, query))
|
all_data = list(apply_common_filters(all_data, query))
|
||||||
|
|
||||||
# parse python STIX objects from the STIX object dicts
|
# parse python STIX objects from the STIX object dicts
|
||||||
stix_objs = [parse(stix_obj_dict) for stix_obj_dict in all_data]
|
stix_objs = [parse(stix_obj_dict, allow_custom=allow_custom) for stix_obj_dict in all_data]
|
||||||
|
|
||||||
return stix_objs
|
return stix_objs
|
||||||
|
|
||||||
|
@ -225,7 +231,6 @@ class TAXIICollectionSource(DataSource):
|
||||||
for 'requests.get()'.
|
for 'requests.get()'.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
params = {}
|
params = {}
|
||||||
|
|
||||||
for filter_ in query:
|
for filter_ in query:
|
||||||
|
|
|
@ -1,16 +1,9 @@
|
||||||
{
|
{
|
||||||
"id": "bundle--2ed6ab6a-ca68-414f-8493-e4db8b75dd51",
|
"created": "2017-05-31T21:30:41.022744Z",
|
||||||
"objects": [
|
"created_by_ref": "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
|
||||||
{
|
"description": "Identify unnecessary system utilities or potentially malicious software that may be used to collect data from a network share, and audit and/or block them by using whitelisting[[CiteRef::Beechey 2010]] tools, like AppLocker,[[CiteRef::Windows Commands JPCERT]][[CiteRef::NSA MS AppLocker]] or Software Restriction Policies[[CiteRef::Corio 2008]] where appropriate.[[CiteRef::TechNet Applocker vs SRP]]",
|
||||||
"created": "2017-05-31T21:30:41.022744Z",
|
"id": "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd",
|
||||||
"created_by_ref": "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
|
"modified": "2017-05-31T21:30:41.022744Z",
|
||||||
"description": "Identify unnecessary system utilities or potentially malicious software that may be used to collect data from a network share, and audit and/or block them by using whitelisting[[CiteRef::Beechey 2010]] tools, like AppLocker,[[CiteRef::Windows Commands JPCERT]][[CiteRef::NSA MS AppLocker]] or Software Restriction Policies[[CiteRef::Corio 2008]] where appropriate.[[CiteRef::TechNet Applocker vs SRP]]",
|
"name": "Data from Network Shared Drive Mitigation",
|
||||||
"id": "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd",
|
"type": "course-of-action"
|
||||||
"modified": "2017-05-31T21:30:41.022744Z",
|
|
||||||
"name": "Data from Network Shared Drive Mitigation",
|
|
||||||
"type": "course-of-action"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"spec_version": "2.0",
|
|
||||||
"type": "bundle"
|
|
||||||
}
|
}
|
|
@ -159,3 +159,10 @@ def test_parse_unknown_type():
|
||||||
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
|
with pytest.raises(stix2.exceptions.ParseError) as excinfo:
|
||||||
stix2.parse(unknown)
|
stix2.parse(unknown)
|
||||||
assert str(excinfo.value) == "Can't parse unknown object type 'other'! For custom types, use the CustomObject decorator."
|
assert str(excinfo.value) == "Can't parse unknown object type 'other'! For custom types, use the CustomObject decorator."
|
||||||
|
|
||||||
|
|
||||||
|
def test_stix_object_property():
|
||||||
|
prop = stix2.core.STIXObjectProperty()
|
||||||
|
|
||||||
|
identity = stix2.Identity(name="test", identity_class="individual")
|
||||||
|
assert prop.clean(identity) is identity
|
||||||
|
|
|
@ -91,6 +91,7 @@ def test_custom_property_in_bundled_object():
|
||||||
bundle = stix2.Bundle(identity, allow_custom=True)
|
bundle = stix2.Bundle(identity, allow_custom=True)
|
||||||
|
|
||||||
assert bundle.objects[0].x_foo == "bar"
|
assert bundle.objects[0].x_foo == "bar"
|
||||||
|
assert '"x_foo": "bar"' in str(bundle)
|
||||||
|
|
||||||
|
|
||||||
@stix2.sdo.CustomObject('x-new-type', [
|
@stix2.sdo.CustomObject('x-new-type', [
|
||||||
|
|
|
@ -1,17 +1,13 @@
|
||||||
import os
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from taxii2client import Collection
|
from taxii2client import Collection
|
||||||
|
|
||||||
from stix2 import (Campaign, FileSystemSink, FileSystemSource, FileSystemStore,
|
from stix2 import Filter, MemorySource
|
||||||
Filter, MemorySource, MemoryStore)
|
|
||||||
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
|
from stix2.sources import (CompositeDataSource, DataSink, DataSource,
|
||||||
DataStore, make_id, taxii)
|
DataStore, make_id, taxii)
|
||||||
from stix2.sources.filters import apply_common_filters
|
from stix2.sources.filters import apply_common_filters
|
||||||
from stix2.utils import deduplicate
|
from stix2.utils import deduplicate
|
||||||
|
|
||||||
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
|
COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
|
||||||
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
|
|
||||||
|
|
||||||
|
|
||||||
class MockTAXIIClient(object):
|
class MockTAXIIClient(object):
|
||||||
|
@ -148,28 +144,6 @@ def test_ds_abstract_class_smoke():
|
||||||
ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")])
|
ds3.query([Filter("id", "=", "malware--fdd60b30-b67c-11e3-b0b9-f01faf20d111")])
|
||||||
|
|
||||||
|
|
||||||
def test_memory_store_smoke():
|
|
||||||
# Initialize MemoryStore with dict
|
|
||||||
ms = MemoryStore(STIX_OBJS1)
|
|
||||||
|
|
||||||
# Add item to sink
|
|
||||||
ms.add(dict(id="bundle--%s" % make_id(),
|
|
||||||
objects=STIX_OBJS2,
|
|
||||||
spec_version="2.0",
|
|
||||||
type="bundle"))
|
|
||||||
|
|
||||||
resp = ms.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
|
||||||
assert len(resp) == 1
|
|
||||||
|
|
||||||
resp = ms.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
|
|
||||||
assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
|
|
||||||
|
|
||||||
query = [Filter('type', '=', 'malware')]
|
|
||||||
|
|
||||||
resp = ms.query(query)
|
|
||||||
assert len(resp) == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_ds_taxii(collection):
|
def test_ds_taxii(collection):
|
||||||
ds = taxii.TAXIICollectionSource(collection)
|
ds = taxii.TAXIICollectionSource(collection)
|
||||||
assert ds.collection is not None
|
assert ds.collection is not None
|
||||||
|
@ -512,207 +486,3 @@ def test_composite_datasource_operations():
|
||||||
# STIX_OBJS2 has indicator with later time, one with different id, one with
|
# STIX_OBJS2 has indicator with later time, one with different id, one with
|
||||||
# original time in STIX_OBJS1
|
# original time in STIX_OBJS1
|
||||||
assert len(results) == 3
|
assert len(results) == 3
|
||||||
|
|
||||||
|
|
||||||
def test_filesytem_source():
|
|
||||||
# creation
|
|
||||||
fs_source = FileSystemSource(FS_PATH)
|
|
||||||
assert fs_source.stix_dir == FS_PATH
|
|
||||||
|
|
||||||
# get object
|
|
||||||
mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")
|
|
||||||
assert mal.id == "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"
|
|
||||||
assert mal.name == "Rover"
|
|
||||||
|
|
||||||
# all versions - (currently not a true all versions call as FileSystem cant have multiple versions)
|
|
||||||
id_ = fs_source.get("identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5")
|
|
||||||
assert id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
|
|
||||||
assert id_.name == "The MITRE Corporation"
|
|
||||||
assert id_.type == "identity"
|
|
||||||
|
|
||||||
# query
|
|
||||||
intrusion_sets = fs_source.query([Filter("type", '=', "intrusion-set")])
|
|
||||||
assert len(intrusion_sets) == 2
|
|
||||||
assert "intrusion-set--a653431d-6a5e-4600-8ad3-609b5af57064" in [is_.id for is_ in intrusion_sets]
|
|
||||||
assert "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a" in [is_.id for is_ in intrusion_sets]
|
|
||||||
|
|
||||||
is_1 = [is_ for is_ in intrusion_sets if is_.id == "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a"][0]
|
|
||||||
assert "DragonOK" in is_1.aliases
|
|
||||||
assert len(is_1.external_references) == 4
|
|
||||||
|
|
||||||
# query2
|
|
||||||
is_2 = fs_source.query([Filter("external_references.external_id", '=', "T1027")])
|
|
||||||
assert len(is_2) == 1
|
|
||||||
|
|
||||||
is_2 = is_2[0]
|
|
||||||
assert is_2.id == "attack-pattern--b3d682b6-98f2-4fb0-aa3b-b4df007ca70a"
|
|
||||||
assert is_2.type == "attack-pattern"
|
|
||||||
|
|
||||||
|
|
||||||
def test_filesystem_sink():
|
|
||||||
# creation
|
|
||||||
fs_sink = FileSystemSink(FS_PATH)
|
|
||||||
assert fs_sink.stix_dir == FS_PATH
|
|
||||||
|
|
||||||
fs_source = FileSystemSource(FS_PATH)
|
|
||||||
|
|
||||||
# Test all the ways stix objects can be added (via different supplied forms)
|
|
||||||
|
|
||||||
# add python stix object
|
|
||||||
camp1 = Campaign(name="Hannibal",
|
|
||||||
objective="Targeting Italian and Spanish Diplomat internet accounts",
|
|
||||||
aliases=["War Elephant"])
|
|
||||||
|
|
||||||
fs_sink.add(camp1)
|
|
||||||
|
|
||||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp1.id + ".json"))
|
|
||||||
|
|
||||||
camp1_r = fs_source.get(camp1.id)
|
|
||||||
assert camp1_r.id == camp1.id
|
|
||||||
assert camp1_r.name == "Hannibal"
|
|
||||||
assert "War Elephant" in camp1_r.aliases
|
|
||||||
|
|
||||||
# add stix object dict
|
|
||||||
camp2 = {
|
|
||||||
"name": "Aurelius",
|
|
||||||
"type": "campaign",
|
|
||||||
"objective": "German and French Intelligence Services",
|
|
||||||
"aliases": ["Purple Robes"],
|
|
||||||
"id": "campaign--111111b6-1112-4fb0-111b-b111107ca70a",
|
|
||||||
"created": "2017-05-31T21:31:53.197755Z"
|
|
||||||
}
|
|
||||||
|
|
||||||
fs_sink.add(camp2)
|
|
||||||
|
|
||||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp2["id"] + ".json"))
|
|
||||||
|
|
||||||
camp2_r = fs_source.get(camp2["id"])
|
|
||||||
assert camp2_r.id == camp2["id"]
|
|
||||||
assert camp2_r.name == camp2["name"]
|
|
||||||
assert "Purple Robes" in camp2_r.aliases
|
|
||||||
|
|
||||||
# add stix bundle dict
|
|
||||||
bund = {
|
|
||||||
"type": "bundle",
|
|
||||||
"id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a",
|
|
||||||
"spec_version": "2.0",
|
|
||||||
"objects": [
|
|
||||||
{
|
|
||||||
"name": "Atilla",
|
|
||||||
"type": "campaign",
|
|
||||||
"objective": "Bulgarian, Albanian and Romanian Intelligence Services",
|
|
||||||
"aliases": ["Huns"],
|
|
||||||
"id": "campaign--133111b6-1112-4fb0-111b-b111107ca70a",
|
|
||||||
"created": "2017-05-31T21:31:53.197755Z"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
fs_sink.add(bund)
|
|
||||||
|
|
||||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", bund["objects"][0]["id"] + ".json"))
|
|
||||||
|
|
||||||
camp3_r = fs_source.get(bund["objects"][0]["id"])
|
|
||||||
assert camp3_r.id == bund["objects"][0]["id"]
|
|
||||||
assert camp3_r.name == bund["objects"][0]["name"]
|
|
||||||
assert "Huns" in camp3_r.aliases
|
|
||||||
|
|
||||||
# add json-encoded stix obj
|
|
||||||
camp4 = '{"type": "campaign", "id":"campaign--144111b6-1112-4fb0-111b-b111107ca70a",'\
|
|
||||||
' "created":"2017-05-31T21:31:53.197755Z", "name": "Ghengis Khan", "objective": "China and Russian infrastructure"}'
|
|
||||||
|
|
||||||
fs_sink.add(camp4)
|
|
||||||
|
|
||||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--144111b6-1112-4fb0-111b-b111107ca70a" + ".json"))
|
|
||||||
|
|
||||||
camp4_r = fs_source.get("campaign--144111b6-1112-4fb0-111b-b111107ca70a")
|
|
||||||
assert camp4_r.id == "campaign--144111b6-1112-4fb0-111b-b111107ca70a"
|
|
||||||
assert camp4_r.name == "Ghengis Khan"
|
|
||||||
|
|
||||||
# add json-encoded stix bundle
|
|
||||||
bund2 = '{"type": "bundle", "id": "bundle--332211b6-1132-4fb0-111b-b111107ca70a",' \
|
|
||||||
' "spec_version": "2.0", "objects": [{"type": "campaign", "id": "campaign--155155b6-1112-4fb0-111b-b111107ca70a",' \
|
|
||||||
' "created":"2017-05-31T21:31:53.197755Z", "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}'
|
|
||||||
fs_sink.add(bund2)
|
|
||||||
|
|
||||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--155155b6-1112-4fb0-111b-b111107ca70a" + ".json"))
|
|
||||||
|
|
||||||
camp5_r = fs_source.get("campaign--155155b6-1112-4fb0-111b-b111107ca70a")
|
|
||||||
assert camp5_r.id == "campaign--155155b6-1112-4fb0-111b-b111107ca70a"
|
|
||||||
assert camp5_r.name == "Spartacus"
|
|
||||||
|
|
||||||
# add list of objects
|
|
||||||
camp6 = Campaign(name="Comanche",
|
|
||||||
objective="US Midwest manufacturing firms, oil refineries, and businesses",
|
|
||||||
aliases=["Horse Warrior"])
|
|
||||||
|
|
||||||
camp7 = {
|
|
||||||
"name": "Napolean",
|
|
||||||
"type": "campaign",
|
|
||||||
"objective": "Central and Eastern Europe military commands and departments",
|
|
||||||
"aliases": ["The Frenchmen"],
|
|
||||||
"id": "campaign--122818b6-1112-4fb0-111b-b111107ca70a",
|
|
||||||
"created": "2017-05-31T21:31:53.197755Z"
|
|
||||||
}
|
|
||||||
|
|
||||||
fs_sink.add([camp6, camp7])
|
|
||||||
|
|
||||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp6.id + ".json"))
|
|
||||||
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-111b-b111107ca70a" + ".json"))
|
|
||||||
|
|
||||||
camp6_r = fs_source.get(camp6.id)
|
|
||||||
assert camp6_r.id == camp6.id
|
|
||||||
assert "Horse Warrior" in camp6_r.aliases
|
|
||||||
|
|
||||||
camp7_r = fs_source.get(camp7["id"])
|
|
||||||
assert camp7_r.id == camp7["id"]
|
|
||||||
assert "The Frenchmen" in camp7_r.aliases
|
|
||||||
|
|
||||||
# remove all added objects
|
|
||||||
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
|
|
||||||
os.remove(os.path.join(FS_PATH, "campaign", camp2_r.id + ".json"))
|
|
||||||
os.remove(os.path.join(FS_PATH, "campaign", camp3_r.id + ".json"))
|
|
||||||
os.remove(os.path.join(FS_PATH, "campaign", camp4_r.id + ".json"))
|
|
||||||
os.remove(os.path.join(FS_PATH, "campaign", camp5_r.id + ".json"))
|
|
||||||
os.remove(os.path.join(FS_PATH, "campaign", camp6_r.id + ".json"))
|
|
||||||
os.remove(os.path.join(FS_PATH, "campaign", camp7_r.id + ".json"))
|
|
||||||
|
|
||||||
# remove campaign dir (that was added in course of testing)
|
|
||||||
os.rmdir(os.path.join(FS_PATH, "campaign"))
|
|
||||||
|
|
||||||
|
|
||||||
def test_filesystem_store():
|
|
||||||
# creation
|
|
||||||
fs_store = FileSystemStore(FS_PATH)
|
|
||||||
|
|
||||||
# get()
|
|
||||||
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
|
|
||||||
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
|
|
||||||
assert coa.type == "course-of-action"
|
|
||||||
|
|
||||||
# all versions() - (note at this time, all_versions() is still not applicable to FileSystem, as only one version is ever stored)
|
|
||||||
rel = fs_store.all_versions("relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1")[0]
|
|
||||||
assert rel.id == "relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1"
|
|
||||||
assert rel.type == "relationship"
|
|
||||||
|
|
||||||
# query()
|
|
||||||
tools = fs_store.query([Filter("labels", "in", "tool")])
|
|
||||||
assert len(tools) == 2
|
|
||||||
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
|
|
||||||
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
|
|
||||||
|
|
||||||
# add()
|
|
||||||
camp1 = Campaign(name="Great Heathen Army",
|
|
||||||
objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England",
|
|
||||||
aliases=["Ragnar"])
|
|
||||||
fs_store.add(camp1)
|
|
||||||
|
|
||||||
camp1_r = fs_store.get(camp1.id)
|
|
||||||
assert camp1_r.id == camp1.id
|
|
||||||
assert camp1_r.name == camp1.name
|
|
||||||
|
|
||||||
# remove
|
|
||||||
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
|
|
||||||
|
|
||||||
# remove campaign dir
|
|
||||||
os.rmdir(os.path.join(FS_PATH, "campaign"))
|
|
||||||
|
|
|
@ -184,3 +184,35 @@ def test_parse_malware():
|
||||||
assert mal.modified == FAKE_TIME
|
assert mal.modified == FAKE_TIME
|
||||||
assert mal.labels == ['ransomware']
|
assert mal.labels == ['ransomware']
|
||||||
assert mal.name == "Cryptolocker"
|
assert mal.name == "Cryptolocker"
|
||||||
|
|
||||||
|
|
||||||
|
def test_created_by():
|
||||||
|
identity = stix2.Identity(**IDENTITY_KWARGS)
|
||||||
|
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
||||||
|
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
|
||||||
|
env.add(identity)
|
||||||
|
|
||||||
|
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
|
||||||
|
creator = env.creator_of(ind)
|
||||||
|
assert creator is identity
|
||||||
|
|
||||||
|
|
||||||
|
def test_created_by_no_datasource():
|
||||||
|
identity = stix2.Identity(**IDENTITY_KWARGS)
|
||||||
|
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
||||||
|
env = stix2.Environment(factory=factory)
|
||||||
|
|
||||||
|
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
|
||||||
|
with pytest.raises(AttributeError) as excinfo:
|
||||||
|
env.creator_of(ind)
|
||||||
|
assert 'Environment has no data source' in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_created_by_not_found():
|
||||||
|
identity = stix2.Identity(**IDENTITY_KWARGS)
|
||||||
|
factory = stix2.ObjectFactory(created_by_ref=identity.id)
|
||||||
|
env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
|
||||||
|
|
||||||
|
ind = env.create(stix2.Indicator, **INDICATOR_KWARGS)
|
||||||
|
creator = env.creator_of(ind)
|
||||||
|
assert creator is None
|
||||||
|
|
|
@ -0,0 +1,377 @@
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from stix2 import (Bundle, Campaign, CustomObject, FileSystemSink,
|
||||||
|
FileSystemSource, FileSystemStore, Filter, properties)
|
||||||
|
|
||||||
|
FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fs_store():
|
||||||
|
# create
|
||||||
|
yield FileSystemStore(FS_PATH)
|
||||||
|
|
||||||
|
# remove campaign dir
|
||||||
|
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fs_source():
|
||||||
|
# create
|
||||||
|
fs = FileSystemSource(FS_PATH)
|
||||||
|
assert fs.stix_dir == FS_PATH
|
||||||
|
yield fs
|
||||||
|
|
||||||
|
# remove campaign dir
|
||||||
|
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fs_sink():
|
||||||
|
# create
|
||||||
|
fs = FileSystemSink(FS_PATH)
|
||||||
|
assert fs.stix_dir == FS_PATH
|
||||||
|
yield fs
|
||||||
|
|
||||||
|
# remove campaign dir
|
||||||
|
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_source_nonexistent_folder():
|
||||||
|
with pytest.raises(ValueError) as excinfo:
|
||||||
|
FileSystemSource('nonexistent-folder')
|
||||||
|
assert "for STIX data does not exist" in str(excinfo)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_sink_nonexistent_folder():
|
||||||
|
with pytest.raises(ValueError) as excinfo:
|
||||||
|
FileSystemSink('nonexistent-folder')
|
||||||
|
assert "for STIX data does not exist" in str(excinfo)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesytem_source_get_object(fs_source):
|
||||||
|
# get object
|
||||||
|
mal = fs_source.get("malware--6b616fc1-1505-48e3-8b2c-0d19337bff38")
|
||||||
|
assert mal.id == "malware--6b616fc1-1505-48e3-8b2c-0d19337bff38"
|
||||||
|
assert mal.name == "Rover"
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesytem_source_get_nonexistent_object(fs_source):
|
||||||
|
ind = fs_source.get("indicator--6b616fc1-1505-48e3-8b2c-0d19337bff38")
|
||||||
|
assert ind is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesytem_source_all_versions(fs_source):
|
||||||
|
# all versions - (currently not a true all versions call as FileSystem cant have multiple versions)
|
||||||
|
id_ = fs_source.get("identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5")
|
||||||
|
assert id_.id == "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5"
|
||||||
|
assert id_.name == "The MITRE Corporation"
|
||||||
|
assert id_.type == "identity"
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesytem_source_query_single(fs_source):
|
||||||
|
# query2
|
||||||
|
is_2 = fs_source.query([Filter("external_references.external_id", '=', "T1027")])
|
||||||
|
assert len(is_2) == 1
|
||||||
|
|
||||||
|
is_2 = is_2[0]
|
||||||
|
assert is_2.id == "attack-pattern--b3d682b6-98f2-4fb0-aa3b-b4df007ca70a"
|
||||||
|
assert is_2.type == "attack-pattern"
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesytem_source_query_multiple(fs_source):
|
||||||
|
# query
|
||||||
|
intrusion_sets = fs_source.query([Filter("type", '=', "intrusion-set")])
|
||||||
|
assert len(intrusion_sets) == 2
|
||||||
|
assert "intrusion-set--a653431d-6a5e-4600-8ad3-609b5af57064" in [is_.id for is_ in intrusion_sets]
|
||||||
|
assert "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a" in [is_.id for is_ in intrusion_sets]
|
||||||
|
|
||||||
|
is_1 = [is_ for is_ in intrusion_sets if is_.id == "intrusion-set--f3bdec95-3d62-42d9-a840-29630f6cdc1a"][0]
|
||||||
|
assert "DragonOK" in is_1.aliases
|
||||||
|
assert len(is_1.external_references) == 4
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_sink_add_python_stix_object(fs_sink, fs_source):
|
||||||
|
# add python stix object
|
||||||
|
camp1 = Campaign(name="Hannibal",
|
||||||
|
objective="Targeting Italian and Spanish Diplomat internet accounts",
|
||||||
|
aliases=["War Elephant"])
|
||||||
|
|
||||||
|
fs_sink.add(camp1)
|
||||||
|
|
||||||
|
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp1.id + ".json"))
|
||||||
|
|
||||||
|
camp1_r = fs_source.get(camp1.id)
|
||||||
|
assert camp1_r.id == camp1.id
|
||||||
|
assert camp1_r.name == "Hannibal"
|
||||||
|
assert "War Elephant" in camp1_r.aliases
|
||||||
|
|
||||||
|
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_sink_add_stix_object_dict(fs_sink, fs_source):
|
||||||
|
# add stix object dict
|
||||||
|
camp2 = {
|
||||||
|
"name": "Aurelius",
|
||||||
|
"type": "campaign",
|
||||||
|
"objective": "German and French Intelligence Services",
|
||||||
|
"aliases": ["Purple Robes"],
|
||||||
|
"id": "campaign--111111b6-1112-4fb0-111b-b111107ca70a",
|
||||||
|
"created": "2017-05-31T21:31:53.197755Z"
|
||||||
|
}
|
||||||
|
|
||||||
|
fs_sink.add(camp2)
|
||||||
|
|
||||||
|
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp2["id"] + ".json"))
|
||||||
|
|
||||||
|
camp2_r = fs_source.get(camp2["id"])
|
||||||
|
assert camp2_r.id == camp2["id"]
|
||||||
|
assert camp2_r.name == camp2["name"]
|
||||||
|
assert "Purple Robes" in camp2_r.aliases
|
||||||
|
|
||||||
|
os.remove(os.path.join(FS_PATH, "campaign", camp2_r.id + ".json"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_sink_add_stix_bundle_dict(fs_sink, fs_source):
|
||||||
|
# add stix bundle dict
|
||||||
|
bund = {
|
||||||
|
"type": "bundle",
|
||||||
|
"id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a",
|
||||||
|
"spec_version": "2.0",
|
||||||
|
"objects": [
|
||||||
|
{
|
||||||
|
"name": "Atilla",
|
||||||
|
"type": "campaign",
|
||||||
|
"objective": "Bulgarian, Albanian and Romanian Intelligence Services",
|
||||||
|
"aliases": ["Huns"],
|
||||||
|
"id": "campaign--133111b6-1112-4fb0-111b-b111107ca70a",
|
||||||
|
"created": "2017-05-31T21:31:53.197755Z"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
fs_sink.add(bund)
|
||||||
|
|
||||||
|
assert os.path.exists(os.path.join(FS_PATH, "campaign", bund["objects"][0]["id"] + ".json"))
|
||||||
|
|
||||||
|
camp3_r = fs_source.get(bund["objects"][0]["id"])
|
||||||
|
assert camp3_r.id == bund["objects"][0]["id"]
|
||||||
|
assert camp3_r.name == bund["objects"][0]["name"]
|
||||||
|
assert "Huns" in camp3_r.aliases
|
||||||
|
|
||||||
|
os.remove(os.path.join(FS_PATH, "campaign", camp3_r.id + ".json"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_sink_add_json_stix_object(fs_sink, fs_source):
|
||||||
|
# add json-encoded stix obj
|
||||||
|
camp4 = '{"type": "campaign", "id":"campaign--144111b6-1112-4fb0-111b-b111107ca70a",'\
|
||||||
|
' "created":"2017-05-31T21:31:53.197755Z", "name": "Ghengis Khan", "objective": "China and Russian infrastructure"}'
|
||||||
|
|
||||||
|
fs_sink.add(camp4)
|
||||||
|
|
||||||
|
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--144111b6-1112-4fb0-111b-b111107ca70a" + ".json"))
|
||||||
|
|
||||||
|
camp4_r = fs_source.get("campaign--144111b6-1112-4fb0-111b-b111107ca70a")
|
||||||
|
assert camp4_r.id == "campaign--144111b6-1112-4fb0-111b-b111107ca70a"
|
||||||
|
assert camp4_r.name == "Ghengis Khan"
|
||||||
|
|
||||||
|
os.remove(os.path.join(FS_PATH, "campaign", camp4_r.id + ".json"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_sink_json_stix_bundle(fs_sink, fs_source):
|
||||||
|
# add json-encoded stix bundle
|
||||||
|
bund2 = '{"type": "bundle", "id": "bundle--332211b6-1132-4fb0-111b-b111107ca70a",' \
|
||||||
|
' "spec_version": "2.0", "objects": [{"type": "campaign", "id": "campaign--155155b6-1112-4fb0-111b-b111107ca70a",' \
|
||||||
|
' "created":"2017-05-31T21:31:53.197755Z", "name": "Spartacus", "objective": "Oppressive regimes of Africa and Middle East"}]}'
|
||||||
|
fs_sink.add(bund2)
|
||||||
|
|
||||||
|
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--155155b6-1112-4fb0-111b-b111107ca70a" + ".json"))
|
||||||
|
|
||||||
|
camp5_r = fs_source.get("campaign--155155b6-1112-4fb0-111b-b111107ca70a")
|
||||||
|
assert camp5_r.id == "campaign--155155b6-1112-4fb0-111b-b111107ca70a"
|
||||||
|
assert camp5_r.name == "Spartacus"
|
||||||
|
|
||||||
|
os.remove(os.path.join(FS_PATH, "campaign", camp5_r.id + ".json"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_sink_add_objects_list(fs_sink, fs_source):
|
||||||
|
# add list of objects
|
||||||
|
camp6 = Campaign(name="Comanche",
|
||||||
|
objective="US Midwest manufacturing firms, oil refineries, and businesses",
|
||||||
|
aliases=["Horse Warrior"])
|
||||||
|
|
||||||
|
camp7 = {
|
||||||
|
"name": "Napolean",
|
||||||
|
"type": "campaign",
|
||||||
|
"objective": "Central and Eastern Europe military commands and departments",
|
||||||
|
"aliases": ["The Frenchmen"],
|
||||||
|
"id": "campaign--122818b6-1112-4fb0-111b-b111107ca70a",
|
||||||
|
"created": "2017-05-31T21:31:53.197755Z"
|
||||||
|
}
|
||||||
|
|
||||||
|
fs_sink.add([camp6, camp7])
|
||||||
|
|
||||||
|
assert os.path.exists(os.path.join(FS_PATH, "campaign", camp6.id + ".json"))
|
||||||
|
assert os.path.exists(os.path.join(FS_PATH, "campaign", "campaign--122818b6-1112-4fb0-111b-b111107ca70a" + ".json"))
|
||||||
|
|
||||||
|
camp6_r = fs_source.get(camp6.id)
|
||||||
|
assert camp6_r.id == camp6.id
|
||||||
|
assert "Horse Warrior" in camp6_r.aliases
|
||||||
|
|
||||||
|
camp7_r = fs_source.get(camp7["id"])
|
||||||
|
assert camp7_r.id == camp7["id"]
|
||||||
|
assert "The Frenchmen" in camp7_r.aliases
|
||||||
|
|
||||||
|
# remove all added objects
|
||||||
|
os.remove(os.path.join(FS_PATH, "campaign", camp6_r.id + ".json"))
|
||||||
|
os.remove(os.path.join(FS_PATH, "campaign", camp7_r.id + ".json"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_get_stored_as_bundle(fs_store):
|
||||||
|
coa = fs_store.get("course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f")
|
||||||
|
assert coa.id == "course-of-action--95ddb356-7ba0-4bd9-a889-247262b8946f"
|
||||||
|
assert coa.type == "course-of-action"
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_get_stored_as_object(fs_store):
|
||||||
|
coa = fs_store.get("course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd")
|
||||||
|
assert coa.id == "course-of-action--d9727aee-48b8-4fdb-89e2-4c49746ba4dd"
|
||||||
|
assert coa.type == "course-of-action"
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_all_versions(fs_store):
|
||||||
|
# all versions() - (note at this time, all_versions() is still not applicable to FileSystem, as only one version is ever stored)
|
||||||
|
rel = fs_store.all_versions("relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1")[0]
|
||||||
|
assert rel.id == "relationship--70dc6b5c-c524-429e-a6ab-0dd40f0482c1"
|
||||||
|
assert rel.type == "relationship"
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_query(fs_store):
|
||||||
|
# query()
|
||||||
|
tools = fs_store.query([Filter("labels", "in", "tool")])
|
||||||
|
assert len(tools) == 2
|
||||||
|
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
|
||||||
|
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_query_single_filter(fs_store):
|
||||||
|
query = Filter("labels", "in", "tool")
|
||||||
|
tools = fs_store.query(query)
|
||||||
|
assert len(tools) == 2
|
||||||
|
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [tool.id for tool in tools]
|
||||||
|
assert "tool--03342581-f790-4f03-ba41-e82e67392e23" in [tool.id for tool in tools]
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_empty_query(fs_store):
|
||||||
|
results = fs_store.query() # returns all
|
||||||
|
assert len(results) == 26
|
||||||
|
assert "tool--242f3da3-4425-4d11-8f5c-b842886da966" in [obj.id for obj in results]
|
||||||
|
assert "marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168" in [obj.id for obj in results]
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_query_multiple_filters(fs_store):
|
||||||
|
fs_store.source.filters.add(Filter("labels", "in", "tool"))
|
||||||
|
tools = fs_store.query(Filter("id", "=", "tool--242f3da3-4425-4d11-8f5c-b842886da966"))
|
||||||
|
assert len(tools) == 1
|
||||||
|
assert tools[0].id == "tool--242f3da3-4425-4d11-8f5c-b842886da966"
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_query_dont_include_type_folder(fs_store):
|
||||||
|
results = fs_store.query(Filter("type", "!=", "tool"))
|
||||||
|
assert len(results) == 24
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_add(fs_store):
|
||||||
|
# add()
|
||||||
|
camp1 = Campaign(name="Great Heathen Army",
|
||||||
|
objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England",
|
||||||
|
aliases=["Ragnar"])
|
||||||
|
fs_store.add(camp1)
|
||||||
|
|
||||||
|
camp1_r = fs_store.get(camp1.id)
|
||||||
|
assert camp1_r.id == camp1.id
|
||||||
|
assert camp1_r.name == camp1.name
|
||||||
|
|
||||||
|
# remove
|
||||||
|
os.remove(os.path.join(FS_PATH, "campaign", camp1_r.id + ".json"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_add_as_bundle():
|
||||||
|
fs_store = FileSystemStore(FS_PATH, bundlify=True)
|
||||||
|
|
||||||
|
camp1 = Campaign(name="Great Heathen Army",
|
||||||
|
objective="Targeting the government of United Kingdom and insitutions affiliated with the Church Of England",
|
||||||
|
aliases=["Ragnar"])
|
||||||
|
fs_store.add(camp1)
|
||||||
|
|
||||||
|
with open(os.path.join(FS_PATH, "campaign", camp1.id + ".json")) as bundle_file:
|
||||||
|
assert '"type": "bundle"' in bundle_file.read()
|
||||||
|
|
||||||
|
camp1_r = fs_store.get(camp1.id)
|
||||||
|
assert camp1_r.id == camp1.id
|
||||||
|
assert camp1_r.name == camp1.name
|
||||||
|
|
||||||
|
shutil.rmtree(os.path.join(FS_PATH, "campaign"), True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_add_bundle_object(fs_store):
|
||||||
|
bundle = Bundle()
|
||||||
|
fs_store.add(bundle)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_store_add_invalid_object(fs_store):
|
||||||
|
ind = ('campaign', 'campaign--111111b6-1112-4fb0-111b-b111107ca70a') # tuple isn't valid
|
||||||
|
with pytest.raises(TypeError) as excinfo:
|
||||||
|
fs_store.add(ind)
|
||||||
|
assert 'stix_data must be' in str(excinfo.value)
|
||||||
|
assert 'a STIX object' in str(excinfo.value)
|
||||||
|
assert 'JSON formatted STIX' in str(excinfo.value)
|
||||||
|
assert 'JSON formatted STIX bundle' in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_object_with_custom_property(fs_store):
|
||||||
|
camp = Campaign(name="Scipio Africanus",
|
||||||
|
objective="Defeat the Carthaginians",
|
||||||
|
x_empire="Roman",
|
||||||
|
allow_custom=True)
|
||||||
|
|
||||||
|
fs_store.add(camp, True)
|
||||||
|
|
||||||
|
camp_r = fs_store.get(camp.id, True)
|
||||||
|
assert camp_r.id == camp.id
|
||||||
|
assert camp_r.x_empire == camp.x_empire
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_object_with_custom_property_in_bundle(fs_store):
|
||||||
|
camp = Campaign(name="Scipio Africanus",
|
||||||
|
objective="Defeat the Carthaginians",
|
||||||
|
x_empire="Roman",
|
||||||
|
allow_custom=True)
|
||||||
|
|
||||||
|
bundle = Bundle(camp, allow_custom=True)
|
||||||
|
fs_store.add(bundle, True)
|
||||||
|
|
||||||
|
camp_r = fs_store.get(camp.id, True)
|
||||||
|
assert camp_r.id == camp.id
|
||||||
|
assert camp_r.x_empire == camp.x_empire
|
||||||
|
|
||||||
|
|
||||||
|
def test_filesystem_custom_object(fs_store):
|
||||||
|
@CustomObject('x-new-obj', [
|
||||||
|
('property1', properties.StringProperty(required=True)),
|
||||||
|
])
|
||||||
|
class NewObj():
|
||||||
|
pass
|
||||||
|
|
||||||
|
newobj = NewObj(property1='something')
|
||||||
|
fs_store.add(newobj, True)
|
||||||
|
|
||||||
|
newobj_r = fs_store.get(newobj.id, True)
|
||||||
|
assert newobj_r.id == newobj.id
|
||||||
|
assert newobj_r.property1 == 'something'
|
||||||
|
|
||||||
|
# remove dir
|
||||||
|
shutil.rmtree(os.path.join(FS_PATH, "x-new-obj"), True)
|
|
@ -0,0 +1,270 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from stix2 import (Bundle, Campaign, CustomObject, Filter, MemorySource,
|
||||||
|
MemoryStore, properties)
|
||||||
|
from stix2.sources import make_id
|
||||||
|
|
||||||
|
IND1 = {
|
||||||
|
"created": "2017-01-27T13:49:53.935Z",
|
||||||
|
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||||
|
"labels": [
|
||||||
|
"url-watchlist"
|
||||||
|
],
|
||||||
|
"modified": "2017-01-27T13:49:53.935Z",
|
||||||
|
"name": "Malicious site hosting downloader",
|
||||||
|
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||||
|
"type": "indicator",
|
||||||
|
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||||
|
}
|
||||||
|
IND2 = {
|
||||||
|
"created": "2017-01-27T13:49:53.935Z",
|
||||||
|
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||||
|
"labels": [
|
||||||
|
"url-watchlist"
|
||||||
|
],
|
||||||
|
"modified": "2017-01-27T13:49:53.935Z",
|
||||||
|
"name": "Malicious site hosting downloader",
|
||||||
|
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||||
|
"type": "indicator",
|
||||||
|
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||||
|
}
|
||||||
|
IND3 = {
|
||||||
|
"created": "2017-01-27T13:49:53.935Z",
|
||||||
|
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||||
|
"labels": [
|
||||||
|
"url-watchlist"
|
||||||
|
],
|
||||||
|
"modified": "2017-01-27T13:49:53.936Z",
|
||||||
|
"name": "Malicious site hosting downloader",
|
||||||
|
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||||
|
"type": "indicator",
|
||||||
|
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||||
|
}
|
||||||
|
IND4 = {
|
||||||
|
"created": "2017-01-27T13:49:53.935Z",
|
||||||
|
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||||
|
"labels": [
|
||||||
|
"url-watchlist"
|
||||||
|
],
|
||||||
|
"modified": "2017-01-27T13:49:53.935Z",
|
||||||
|
"name": "Malicious site hosting downloader",
|
||||||
|
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||||
|
"type": "indicator",
|
||||||
|
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||||
|
}
|
||||||
|
IND5 = {
|
||||||
|
"created": "2017-01-27T13:49:53.935Z",
|
||||||
|
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||||
|
"labels": [
|
||||||
|
"url-watchlist"
|
||||||
|
],
|
||||||
|
"modified": "2017-01-27T13:49:53.935Z",
|
||||||
|
"name": "Malicious site hosting downloader",
|
||||||
|
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||||
|
"type": "indicator",
|
||||||
|
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||||
|
}
|
||||||
|
IND6 = {
|
||||||
|
"created": "2017-01-27T13:49:53.935Z",
|
||||||
|
"id": "indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f",
|
||||||
|
"labels": [
|
||||||
|
"url-watchlist"
|
||||||
|
],
|
||||||
|
"modified": "2017-01-31T13:49:53.935Z",
|
||||||
|
"name": "Malicious site hosting downloader",
|
||||||
|
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||||
|
"type": "indicator",
|
||||||
|
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||||
|
}
|
||||||
|
IND7 = {
|
||||||
|
"created": "2017-01-27T13:49:53.935Z",
|
||||||
|
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||||
|
"labels": [
|
||||||
|
"url-watchlist"
|
||||||
|
],
|
||||||
|
"modified": "2017-01-27T13:49:53.935Z",
|
||||||
|
"name": "Malicious site hosting downloader",
|
||||||
|
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||||
|
"type": "indicator",
|
||||||
|
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||||
|
}
|
||||||
|
IND8 = {
|
||||||
|
"created": "2017-01-27T13:49:53.935Z",
|
||||||
|
"id": "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f",
|
||||||
|
"labels": [
|
||||||
|
"url-watchlist"
|
||||||
|
],
|
||||||
|
"modified": "2017-01-27T13:49:53.935Z",
|
||||||
|
"name": "Malicious site hosting downloader",
|
||||||
|
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
|
||||||
|
"type": "indicator",
|
||||||
|
"valid_from": "2017-01-27T13:49:53.935382Z"
|
||||||
|
}
|
||||||
|
|
||||||
|
STIX_OBJS2 = [IND6, IND7, IND8]
|
||||||
|
STIX_OBJS1 = [IND1, IND2, IND3, IND4, IND5]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mem_store():
|
||||||
|
yield MemoryStore(STIX_OBJS1)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mem_source():
|
||||||
|
yield MemorySource(STIX_OBJS1)
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_source_get(mem_source):
|
||||||
|
resp = mem_source.get("indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
|
||||||
|
assert resp["id"] == "indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f"
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_source_get_nonexistant_object(mem_source):
|
||||||
|
resp = mem_source.get("tool--d81f86b8-975b-bc0b-775e-810c5ad45a4f")
|
||||||
|
assert resp is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_all_versions(mem_store):
|
||||||
|
# Add bundle of items to sink
|
||||||
|
mem_store.add(dict(id="bundle--%s" % make_id(),
|
||||||
|
objects=STIX_OBJS2,
|
||||||
|
spec_version="2.0",
|
||||||
|
type="bundle"))
|
||||||
|
|
||||||
|
resp = mem_store.all_versions("indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f")
|
||||||
|
assert len(resp) == 1 # MemoryStore can only store 1 version of each object
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_query(mem_store):
|
||||||
|
query = [Filter('type', '=', 'malware')]
|
||||||
|
resp = mem_store.query(query)
|
||||||
|
assert len(resp) == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_query_single_filter(mem_store):
|
||||||
|
query = Filter('id', '=', 'indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f')
|
||||||
|
resp = mem_store.query(query)
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_query_empty_query(mem_store):
|
||||||
|
resp = mem_store.query()
|
||||||
|
# sort since returned in random order
|
||||||
|
resp = sorted(resp, key=lambda k: k['id'])
|
||||||
|
assert len(resp) == 2
|
||||||
|
assert resp[0]['id'] == 'indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f'
|
||||||
|
assert resp[0]['modified'] == '2017-01-27T13:49:53.935Z'
|
||||||
|
assert resp[1]['id'] == 'indicator--d81f86b9-975b-bc0b-775e-810c5ad45a4f'
|
||||||
|
assert resp[1]['modified'] == '2017-01-27T13:49:53.936Z'
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_query_multiple_filters(mem_store):
|
||||||
|
mem_store.source.filters.add(Filter('type', '=', 'indicator'))
|
||||||
|
query = Filter('id', '=', 'indicator--d81f86b8-975b-bc0b-775e-810c5ad45a4f')
|
||||||
|
resp = mem_store.query(query)
|
||||||
|
assert len(resp) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_add_stix_object_str(mem_store):
|
||||||
|
# add stix object string
|
||||||
|
camp_id = "campaign--111111b6-1112-4fb0-111b-b111107ca70a"
|
||||||
|
camp_name = "Aurelius"
|
||||||
|
camp_alias = "Purple Robes"
|
||||||
|
camp = """{
|
||||||
|
"name": "%s",
|
||||||
|
"type": "campaign",
|
||||||
|
"objective": "German and French Intelligence Services",
|
||||||
|
"aliases": ["%s"],
|
||||||
|
"id": "%s",
|
||||||
|
"created": "2017-05-31T21:31:53.197755Z"
|
||||||
|
}""" % (camp_name, camp_alias, camp_id)
|
||||||
|
|
||||||
|
mem_store.add(camp)
|
||||||
|
|
||||||
|
camp_r = mem_store.get(camp_id)
|
||||||
|
assert camp_r["id"] == camp_id
|
||||||
|
assert camp_r["name"] == camp_name
|
||||||
|
assert camp_alias in camp_r["aliases"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_add_stix_bundle_str(mem_store):
|
||||||
|
# add stix bundle string
|
||||||
|
camp_id = "campaign--133111b6-1112-4fb0-111b-b111107ca70a"
|
||||||
|
camp_name = "Atilla"
|
||||||
|
camp_alias = "Huns"
|
||||||
|
bund = """{
|
||||||
|
"type": "bundle",
|
||||||
|
"id": "bundle--112211b6-1112-4fb0-111b-b111107ca70a",
|
||||||
|
"spec_version": "2.0",
|
||||||
|
"objects": [
|
||||||
|
{
|
||||||
|
"name": "%s",
|
||||||
|
"type": "campaign",
|
||||||
|
"objective": "Bulgarian, Albanian and Romanian Intelligence Services",
|
||||||
|
"aliases": ["%s"],
|
||||||
|
"id": "%s",
|
||||||
|
"created": "2017-05-31T21:31:53.197755Z"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}""" % (camp_name, camp_alias, camp_id)
|
||||||
|
|
||||||
|
mem_store.add(bund)
|
||||||
|
|
||||||
|
camp_r = mem_store.get(camp_id)
|
||||||
|
assert camp_r["id"] == camp_id
|
||||||
|
assert camp_r["name"] == camp_name
|
||||||
|
assert camp_alias in camp_r["aliases"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_add_invalid_object(mem_store):
|
||||||
|
ind = ('indicator', IND1) # tuple isn't valid
|
||||||
|
with pytest.raises(TypeError) as excinfo:
|
||||||
|
mem_store.add(ind)
|
||||||
|
assert 'stix_data must be' in str(excinfo.value)
|
||||||
|
assert 'a STIX object' in str(excinfo.value)
|
||||||
|
assert 'JSON formatted STIX' in str(excinfo.value)
|
||||||
|
assert 'JSON formatted STIX bundle' in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_object_with_custom_property(mem_store):
|
||||||
|
camp = Campaign(name="Scipio Africanus",
|
||||||
|
objective="Defeat the Carthaginians",
|
||||||
|
x_empire="Roman",
|
||||||
|
allow_custom=True)
|
||||||
|
|
||||||
|
mem_store.add(camp, True)
|
||||||
|
|
||||||
|
camp_r = mem_store.get(camp.id, True)
|
||||||
|
assert camp_r.id == camp.id
|
||||||
|
assert camp_r.x_empire == camp.x_empire
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_object_with_custom_property_in_bundle(mem_store):
|
||||||
|
camp = Campaign(name="Scipio Africanus",
|
||||||
|
objective="Defeat the Carthaginians",
|
||||||
|
x_empire="Roman",
|
||||||
|
allow_custom=True)
|
||||||
|
|
||||||
|
bundle = Bundle(camp, allow_custom=True)
|
||||||
|
mem_store.add(bundle, True)
|
||||||
|
|
||||||
|
bundle_r = mem_store.get(bundle.id, True)
|
||||||
|
camp_r = bundle_r['objects'][0]
|
||||||
|
assert camp_r.id == camp.id
|
||||||
|
assert camp_r.x_empire == camp.x_empire
|
||||||
|
|
||||||
|
|
||||||
|
def test_memory_store_custom_object(mem_store):
|
||||||
|
@CustomObject('x-new-obj', [
|
||||||
|
('property1', properties.StringProperty(required=True)),
|
||||||
|
])
|
||||||
|
class NewObj():
|
||||||
|
pass
|
||||||
|
|
||||||
|
newobj = NewObj(property1='something')
|
||||||
|
mem_store.add(newobj, True)
|
||||||
|
|
||||||
|
newobj_r = mem_store.get(newobj.id, True)
|
||||||
|
assert newobj_r.id == newobj.id
|
||||||
|
assert newobj_r.property1 == 'something'
|
|
@ -34,7 +34,7 @@ class STIXdatetime(dt.datetime):
|
||||||
|
|
||||||
|
|
||||||
def deduplicate(stix_obj_list):
|
def deduplicate(stix_obj_list):
|
||||||
"""Deduplicate a list of STIX objects to a unique set
|
"""Deduplicate a list of STIX objects to a unique set.
|
||||||
|
|
||||||
Reduces a set of STIX objects to unique set by looking
|
Reduces a set of STIX objects to unique set by looking
|
||||||
at 'id' and 'modified' fields - as a unique object version
|
at 'id' and 'modified' fields - as a unique object version
|
||||||
|
@ -44,7 +44,6 @@ def deduplicate(stix_obj_list):
|
||||||
of deduplicate(),that if the "stix_obj_list" argument has
|
of deduplicate(),that if the "stix_obj_list" argument has
|
||||||
multiple STIX objects of the same version, the last object
|
multiple STIX objects of the same version, the last object
|
||||||
version found in the list will be the one that is returned.
|
version found in the list will be the one that is returned.
|
||||||
()
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
stix_obj_list (list): list of STIX objects (dicts)
|
stix_obj_list (list): list of STIX objects (dicts)
|
||||||
|
@ -56,7 +55,11 @@ def deduplicate(stix_obj_list):
|
||||||
unique_objs = {}
|
unique_objs = {}
|
||||||
|
|
||||||
for obj in stix_obj_list:
|
for obj in stix_obj_list:
|
||||||
unique_objs[(obj['id'], obj['modified'])] = obj
|
try:
|
||||||
|
unique_objs[(obj['id'], obj['modified'])] = obj
|
||||||
|
except KeyError:
|
||||||
|
# Handle objects with no `modified` property, e.g. marking-definition
|
||||||
|
unique_objs[(obj['id'], obj['created'])] = obj
|
||||||
|
|
||||||
return list(unique_objs.values())
|
return list(unique_objs.values())
|
||||||
|
|
||||||
|
@ -244,3 +247,10 @@ def revoke(data):
|
||||||
if data.get("revoked"):
|
if data.get("revoked"):
|
||||||
raise RevokeError("revoke")
|
raise RevokeError("revoke")
|
||||||
return new_version(data, revoked=True)
|
return new_version(data, revoked=True)
|
||||||
|
|
||||||
|
|
||||||
|
def get_class_hierarchy_names(obj):
|
||||||
|
names = []
|
||||||
|
for cls in obj.__class__.__mro__:
|
||||||
|
names.append(cls.__name__)
|
||||||
|
return names
|
||||||
|
|
|
@ -18,9 +18,9 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
|
||||||
WindowsRegistryValueType, WindowsServiceExt,
|
WindowsRegistryValueType, WindowsServiceExt,
|
||||||
X509Certificate, X509V3ExtenstionsType,
|
X509Certificate, X509V3ExtenstionsType,
|
||||||
parse_observable)
|
parse_observable)
|
||||||
from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator,
|
from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject,
|
||||||
IntrusionSet, Malware, ObservedData, Report, ThreatActor,
|
Identity, Indicator, IntrusionSet, Malware, ObservedData,
|
||||||
Tool, Vulnerability)
|
Report, ThreatActor, Tool, Vulnerability)
|
||||||
from .sro import Relationship, Sighting
|
from .sro import Relationship, Sighting
|
||||||
|
|
||||||
OBJ_MAP = {
|
OBJ_MAP = {
|
||||||
|
|
Loading…
Reference in New Issue