Merge pull request #34 from oasis-open/object-factory

Object factory
stix2.1
Greg Back 2017-07-19 13:54:24 +00:00 committed by GitHub
commit 595ba10695
7 changed files with 199 additions and 9 deletions

View File

@ -4,6 +4,7 @@
from . import exceptions
from .bundle import Bundle
from .environment import ObjectFactory
from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
AutonomousSystem, CustomObservable, Directory,
DomainName, EmailAddress, EmailMessage,

68
stix2/environment.py Normal file
View File

@ -0,0 +1,68 @@
import copy
class ObjectFactory(object):
"""Object Factory
Used to easily create STIX objects with default values for certain
properties.
Args:
created_by_ref: Default created_by_ref value to apply to all
objects created by this factory.
created: Default created value to apply to all
objects created by this factory.
external_references: Default `external_references` value to apply
to all objects created by this factory.
object_marking_refs: Default `object_marking_refs` value to apply
to all objects created by this factory.
list_append: When a default is set for a list property like
`external_references` or `object_marking_refs` and a value for
that property is passed into `create()`, if this is set to True,
that value will be added to the list alongside the default. If
this is set to False, the passed in value will replace the
default. Defaults to True.
"""
def __init__(self, created_by_ref=None, created=None,
external_references=None, object_marking_refs=None,
list_append=True):
self._defaults = {}
if created_by_ref:
self._defaults['created_by_ref'] = created_by_ref
if created:
self._defaults['created'] = created
# If the user provides a default "created" time, we also want to use
# that as the modified time.
self._defaults['modified'] = created
if external_references:
self._defaults['external_references'] = external_references
if object_marking_refs:
self._defaults['object_marking_refs'] = object_marking_refs
self._list_append = list_append
self._list_properties = ['external_references', 'object_marking_refs']
def create(self, cls, **kwargs):
# Use self.defaults as the base, but update with any explicit args
# provided by the user.
properties = copy.deepcopy(self._defaults)
if kwargs:
if self._list_append:
# Append provided items to list properties instead of replacing them
for list_prop in set(self._list_properties).intersection(kwargs.keys(), properties.keys()):
kwarg_prop = kwargs.pop(list_prop)
if kwarg_prop is None:
del properties[list_prop]
continue
if not isinstance(properties[list_prop], list):
properties[list_prop] = [properties[list_prop]]
if isinstance(kwarg_prop, list):
properties[list_prop].extend(kwarg_prop)
else:
properties[list_prop].append(kwarg_prop)
properties.update(**kwargs)
return cls(**properties)

View File

@ -69,7 +69,7 @@ class MarkingProperty(Property):
class MarkingDefinition(_STIXBase):
_type = 'marking-definition'
_properties = {
'created': TimestampProperty(default=lambda: NOW, required=True),
'created': TimestampProperty(default=lambda: NOW),
'external_references': ListProperty(ExternalReference),
'created_by_ref': ReferenceProperty(type="identity"),
'object_marking_refs': ListProperty(ReferenceProperty(type="marking-definition")),

View File

@ -5,7 +5,7 @@ import inspect
import re
import uuid
from six import text_type
from six import string_types, text_type
from .base import _STIXBase
from .exceptions import DictionaryKeyError
@ -101,12 +101,9 @@ class ListProperty(Property):
iter(value)
except TypeError:
raise ValueError("must be an iterable.")
try:
if isinstance(value, basestring):
value = [value]
except NameError:
if isinstance(value, str):
value = [value]
if isinstance(value, (_STIXBase, string_types)):
value = [value]
result = []
for item in value:

View File

@ -20,6 +20,12 @@ TOOL_ID = "tool--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f"
SIGHTING_ID = "sighting--bfbc19db-ec35-4e45-beed-f8bde2a772fb"
VULNERABILITY_ID = "vulnerability--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061"
# Minimum required args for an Identity instance
IDENTITY_KWARGS = dict(
name="John Smith",
identity_class="individual",
)
# Minimum required args for an Indicator instance
INDICATOR_KWARGS = dict(
labels=['malicious-activity'],

View File

@ -0,0 +1,83 @@
import stix2
from .constants import (FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS,
INDICATOR_KWARGS)
def test_object_factory_created_by_ref_str():
factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert ind.created_by_ref == IDENTITY_ID
def test_object_factory_created_by_ref_obj():
id_obj = stix2.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=id_obj)
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert ind.created_by_ref == IDENTITY_ID
def test_object_factory_override_default():
factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
new_id = "identity--983b3172-44fe-4a80-8091-eb8098841fe8"
ind = factory.create(stix2.Indicator, created_by_ref=new_id, **INDICATOR_KWARGS)
assert ind.created_by_ref == new_id
def test_object_factory_created():
factory = stix2.ObjectFactory(created=FAKE_TIME)
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert ind.created == FAKE_TIME
assert ind.modified == FAKE_TIME
def test_object_factory_external_resource():
ext_ref = stix2.ExternalReference(source_name="ACME Threat Intel",
description="Threat report")
factory = stix2.ObjectFactory(external_references=ext_ref)
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert ind.external_references[0].source_name == "ACME Threat Intel"
assert ind.external_references[0].description == "Threat report"
ind2 = factory.create(stix2.Indicator, external_references=None, **INDICATOR_KWARGS)
assert 'external_references' not in ind2
def test_object_factory_obj_markings():
stmt_marking = stix2.StatementMarking("Copyright 2016, Example Corp")
mark_def = stix2.MarkingDefinition(definition_type="statement",
definition=stmt_marking)
factory = stix2.ObjectFactory(object_marking_refs=[mark_def, stix2.TLP_AMBER])
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert mark_def.id in ind.object_marking_refs
assert stix2.TLP_AMBER.id in ind.object_marking_refs
factory = stix2.ObjectFactory(object_marking_refs=stix2.TLP_RED)
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert stix2.TLP_RED.id in ind.object_marking_refs
def test_object_factory_list_append():
ext_ref = stix2.ExternalReference(source_name="ACME Threat Intel",
description="Threat report from ACME")
ext_ref2 = stix2.ExternalReference(source_name="Yet Another Threat Report",
description="Threat report from YATR")
ext_ref3 = stix2.ExternalReference(source_name="Threat Report #3",
description="One more threat report")
factory = stix2.ObjectFactory(external_references=ext_ref)
ind = factory.create(stix2.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS)
assert ind.external_references[1].source_name == "Yet Another Threat Report"
ind = factory.create(stix2.Indicator, external_references=[ext_ref2, ext_ref3], **INDICATOR_KWARGS)
assert ind.external_references[2].source_name == "Threat Report #3"
def test_object_factory_list_replace():
ext_ref = stix2.ExternalReference(source_name="ACME Threat Intel",
description="Threat report from ACME")
ext_ref2 = stix2.ExternalReference(source_name="Yet Another Threat Report",
description="Threat report from YATR")
factory = stix2.ObjectFactory(external_references=ext_ref, list_append=False)
ind = factory.create(stix2.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS)
assert len(ind.external_references) == 1
assert ind.external_references[0].source_name == "Yet Another Threat Report"

View File

@ -29,6 +29,19 @@ EXPECTED_STATEMENT_MARKING_DEFINITION = """{
"type": "marking-definition"
}"""
EXPECTED_CAMPAIGN_WITH_OBJECT_MARKING = """{
"created": "2016-04-06T20:03:00.000Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"description": "Campaign by Green Group against a series of targets in the financial services sector.",
"id": "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"modified": "2016-04-06T20:03:00.000Z",
"name": "Green Group Attacks Against Finance",
"object_marking_refs": [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
],
"type": "campaign"
}"""
EXPECTED_GRANULAR_MARKING = """{
"marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
"selectors": [
@ -84,6 +97,29 @@ def test_marking_def_example_with_positional_statement():
assert str(marking_definition) == EXPECTED_STATEMENT_MARKING_DEFINITION
def test_marking_def_invalid_type():
with pytest.raises(ValueError):
stix2.MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
definition_type="my-definiition-type",
definition=stix2.StatementMarking("Copyright 2016, Example Corp")
)
def test_campaign_with_markings_example():
campaign = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00Z",
modified="2016-04-06T20:03:00Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector.",
object_marking_refs=TLP_WHITE
)
assert str(campaign) == EXPECTED_CAMPAIGN_WITH_OBJECT_MARKING
def test_granular_example():
granular_marking = stix2.GranularMarking(
marking_ref="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
@ -119,7 +155,6 @@ def test_campaign_with_granular_markings_example():
marking_ref="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
selectors=["description"])
])
print(str(campaign))
assert str(campaign) == EXPECTED_CAMPAIGN_WITH_GRANULAR_MARKINGS