master
Desai, Kartikey H 2019-08-12 08:16:00 -04:00
commit dee2f1f60c
39 changed files with 636 additions and 723 deletions

View File

@ -20,7 +20,7 @@
# flake8: noqa
DEFAULT_VERSION = '2.1' # Default version will always be the latest STIX 2.X version
DEFAULT_VERSION = '2.0' # Default version will always be the latest STIX 2.X version
from .confidence import scales
from .core import _collect_stix2_mappings, parse, parse_observable

View File

@ -5,6 +5,7 @@ import copy
import datetime as dt
import simplejson as json
import six
from .exceptions import (
AtLeastOnePropertyError, CustomContentError, DependentPropertiesError,
@ -88,10 +89,25 @@ class _STIXBase(collections.Mapping):
if prop_name in kwargs:
try:
kwargs[prop_name] = prop.clean(kwargs[prop_name])
except ValueError as exc:
if self.__allow_custom and isinstance(exc, CustomContentError):
return
raise InvalidValueError(self.__class__, prop_name, reason=str(exc))
except InvalidValueError:
# No point in wrapping InvalidValueError in another
# InvalidValueError... so let those propagate.
raise
except CustomContentError as exc:
if not self.__allow_custom:
six.raise_from(
InvalidValueError(
self.__class__, prop_name, reason=str(exc),
),
exc,
)
except Exception as exc:
six.raise_from(
InvalidValueError(
self.__class__, prop_name, reason=str(exc),
),
exc,
)
# interproperty constraint methods

View File

@ -169,19 +169,6 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
raise CustomContentError("Can't parse unknown observable type '%s'! For custom observables, "
"use the CustomObservable decorator." % obj['type'])
EXT_MAP = STIX2_OBJ_MAPS[v]['observable-extensions']
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
try:
ext_class = EXT_MAP[obj['type']][name]
except KeyError:
if not allow_custom:
raise CustomContentError("Can't parse unknown extension type '%s'"
"for observable type '%s'!" % (name, obj['type']))
else: # extension was found
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
return obj_class(allow_custom=allow_custom, **obj)

View File

@ -5,7 +5,15 @@ class STIXError(Exception):
"""Base class for errors generated in the stix2 library."""
class InvalidValueError(STIXError, ValueError):
class ObjectConfigurationError(STIXError):
"""
Represents specification violations regarding the composition of STIX
objects.
"""
pass
class InvalidValueError(ObjectConfigurationError):
"""An invalid value was provided to a STIX object's ``__init__``."""
def __init__(self, cls, prop_name, reason):
@ -19,52 +27,89 @@ class InvalidValueError(STIXError, ValueError):
return msg.format(self)
class MissingPropertiesError(STIXError, ValueError):
class PropertyPresenceError(ObjectConfigurationError):
"""
Represents an invalid combination of properties on a STIX object. This
class can be used directly when the object requirements are more
complicated and none of the more specific exception subclasses apply.
"""
def __init__(self, message, cls):
super(PropertyPresenceError, self).__init__(message)
self.cls = cls
class MissingPropertiesError(PropertyPresenceError):
"""Missing one or more required properties when constructing STIX object."""
def __init__(self, cls, properties):
super(MissingPropertiesError, self).__init__()
self.cls = cls
self.properties = sorted(list(properties))
self.properties = sorted(properties)
def __str__(self):
msg = "No values for required properties for {0}: ({1})."
return msg.format(
self.cls.__name__,
msg = "No values for required properties for {0}: ({1}).".format(
cls.__name__,
", ".join(x for x in self.properties),
)
super(MissingPropertiesError, self).__init__(msg, cls)
class ExtraPropertiesError(STIXError, TypeError):
class ExtraPropertiesError(PropertyPresenceError):
"""One or more extra properties were provided when constructing STIX object."""
def __init__(self, cls, properties):
super(ExtraPropertiesError, self).__init__()
self.cls = cls
self.properties = sorted(list(properties))
self.properties = sorted(properties)
def __str__(self):
msg = "Unexpected properties for {0}: ({1})."
return msg.format(
self.cls.__name__,
msg = "Unexpected properties for {0}: ({1}).".format(
cls.__name__,
", ".join(x for x in self.properties),
)
class ImmutableError(STIXError, ValueError):
"""Attempted to modify an object after creation."""
def __init__(self, cls, key):
super(ImmutableError, self).__init__()
self.cls = cls
self.key = key
def __str__(self):
msg = "Cannot modify '{0.key}' property in '{0.cls.__name__}' after creation."
return msg.format(self)
super(ExtraPropertiesError, self).__init__(msg, cls)
class DictionaryKeyError(STIXError, ValueError):
class MutuallyExclusivePropertiesError(PropertyPresenceError):
"""Violating interproperty mutually exclusive constraint of a STIX object type."""
def __init__(self, cls, properties):
self.properties = sorted(properties)
msg = "The ({1}) properties for {0} are mutually exclusive.".format(
cls.__name__,
", ".join(x for x in self.properties),
)
super(MutuallyExclusivePropertiesError, self).__init__(msg, cls)
class DependentPropertiesError(PropertyPresenceError):
"""Violating interproperty dependency constraint of a STIX object type."""
def __init__(self, cls, dependencies):
self.dependencies = dependencies
msg = "The property dependencies for {0}: ({1}) are not met.".format(
cls.__name__,
", ".join(name for x in self.dependencies for name in x),
)
super(DependentPropertiesError, self).__init__(msg, cls)
class AtLeastOnePropertyError(PropertyPresenceError):
"""Violating a constraint of a STIX object type that at least one of the given properties must be populated."""
def __init__(self, cls, properties):
self.properties = sorted(properties)
msg = "At least one of the ({1}) properties for {0} must be " \
"populated.".format(
cls.__name__,
", ".join(x for x in self.properties),
)
super(AtLeastOnePropertyError, self).__init__(msg, cls)
class DictionaryKeyError(ObjectConfigurationError):
"""Dictionary key does not conform to the correct format."""
def __init__(self, key, reason):
@ -77,7 +122,7 @@ class DictionaryKeyError(STIXError, ValueError):
return msg.format(self)
class InvalidObjRefError(STIXError, ValueError):
class InvalidObjRefError(ObjectConfigurationError):
"""A STIX Cyber Observable Object contains an invalid object reference."""
def __init__(self, cls, prop_name, reason):
@ -91,95 +136,7 @@ class InvalidObjRefError(STIXError, ValueError):
return msg.format(self)
class UnmodifiablePropertyError(STIXError, ValueError):
"""Attempted to modify an unmodifiable property of object when creating a new version."""
def __init__(self, unchangable_properties):
super(UnmodifiablePropertyError, self).__init__()
self.unchangable_properties = unchangable_properties
def __str__(self):
msg = "These properties cannot be changed when making a new version: {0}."
return msg.format(", ".join(self.unchangable_properties))
class MutuallyExclusivePropertiesError(STIXError, TypeError):
"""Violating interproperty mutually exclusive constraint of a STIX object type."""
def __init__(self, cls, properties):
super(MutuallyExclusivePropertiesError, self).__init__()
self.cls = cls
self.properties = sorted(list(properties))
def __str__(self):
msg = "The ({1}) properties for {0} are mutually exclusive."
return msg.format(
self.cls.__name__,
", ".join(x for x in self.properties),
)
class DependentPropertiesError(STIXError, TypeError):
"""Violating interproperty dependency constraint of a STIX object type."""
def __init__(self, cls, dependencies):
super(DependentPropertiesError, self).__init__()
self.cls = cls
self.dependencies = dependencies
def __str__(self):
msg = "The property dependencies for {0}: ({1}) are not met."
return msg.format(
self.cls.__name__,
", ".join(name for x in self.dependencies for name in x),
)
class AtLeastOnePropertyError(STIXError, TypeError):
"""Violating a constraint of a STIX object type that at least one of the given properties must be populated."""
def __init__(self, cls, properties):
super(AtLeastOnePropertyError, self).__init__()
self.cls = cls
self.properties = sorted(list(properties))
def __str__(self):
msg = "At least one of the ({1}) properties for {0} must be populated."
return msg.format(
self.cls.__name__,
", ".join(x for x in self.properties),
)
class RevokeError(STIXError, ValueError):
"""Attempted to an operation on a revoked object."""
def __init__(self, called_by):
super(RevokeError, self).__init__()
self.called_by = called_by
def __str__(self):
if self.called_by == "revoke":
return "Cannot revoke an already revoked object."
else:
return "Cannot create a new version of a revoked object."
class ParseError(STIXError, ValueError):
"""Could not parse object."""
def __init__(self, msg):
super(ParseError, self).__init__(msg)
class CustomContentError(STIXError, ValueError):
"""Custom STIX Content (SDO, Observable, Extension, etc.) detected."""
def __init__(self, msg):
super(CustomContentError, self).__init__(msg)
class InvalidSelectorError(STIXError, AssertionError):
class InvalidSelectorError(ObjectConfigurationError):
"""Granular Marking selector violation. The selector must resolve into an existing STIX object property."""
def __init__(self, cls, key):
@ -192,7 +149,73 @@ class InvalidSelectorError(STIXError, AssertionError):
return msg.format(self.key, self.cls.__class__.__name__)
class MarkingNotFoundError(STIXError, AssertionError):
class TLPMarkingDefinitionError(ObjectConfigurationError):
"""Marking violation. The marking-definition for TLP MUST follow the mandated instances from the spec."""
def __init__(self, user_obj, spec_obj):
super(TLPMarkingDefinitionError, self).__init__()
self.user_obj = user_obj
self.spec_obj = spec_obj
def __str__(self):
msg = "Marking {0} does not match spec marking {1}!"
return msg.format(self.user_obj, self.spec_obj)
class ImmutableError(STIXError):
"""Attempted to modify an object after creation."""
def __init__(self, cls, key):
super(ImmutableError, self).__init__()
self.cls = cls
self.key = key
def __str__(self):
msg = "Cannot modify '{0.key}' property in '{0.cls.__name__}' after creation."
return msg.format(self)
class UnmodifiablePropertyError(STIXError):
"""Attempted to modify an unmodifiable property of object when creating a new version."""
def __init__(self, unchangable_properties):
super(UnmodifiablePropertyError, self).__init__()
self.unchangable_properties = unchangable_properties
def __str__(self):
msg = "These properties cannot be changed when making a new version: {0}."
return msg.format(", ".join(self.unchangable_properties))
class RevokeError(STIXError):
"""Attempted an operation on a revoked object."""
def __init__(self, called_by):
super(RevokeError, self).__init__()
self.called_by = called_by
def __str__(self):
if self.called_by == "revoke":
return "Cannot revoke an already revoked object."
else:
return "Cannot create a new version of a revoked object."
class ParseError(STIXError):
"""Could not parse object."""
def __init__(self, msg):
super(ParseError, self).__init__(msg)
class CustomContentError(STIXError):
"""Custom STIX Content (SDO, Observable, Extension, etc.) detected."""
def __init__(self, msg):
super(CustomContentError, self).__init__(msg)
class MarkingNotFoundError(STIXError):
"""Marking violation. The marking reference must be present in SDO or SRO."""
def __init__(self, cls, key):
@ -205,14 +228,8 @@ class MarkingNotFoundError(STIXError, AssertionError):
return msg.format(self.key, self.cls.__class__.__name__)
class TLPMarkingDefinitionError(STIXError, AssertionError):
"""Marking violation. The marking-definition for TLP MUST follow the mandated instances from the spec."""
def __init__(self, user_obj, spec_obj):
super(TLPMarkingDefinitionError, self).__init__()
self.user_obj = user_obj
self.spec_obj = spec_obj
def __str__(self):
msg = "Marking {0} does not match spec marking {1}!"
return msg.format(self.user_obj, self.spec_obj)
class STIXDeprecationWarning(DeprecationWarning):
"""
Represents usage of a deprecated component of a STIX specification.
"""
pass

View File

@ -102,7 +102,7 @@ class Property(object):
- Return a value that is valid for this property. If ``value`` is not
valid for this property, this will attempt to transform it first. If
``value`` is not valid and no such transformation is possible, it
should raise a ValueError.
should raise an exception.
- ``def default(self):``
- provide a default value for this property.
- ``default()`` can return the special value ``NOW`` to use the current

View File

@ -1,37 +1,43 @@
import importlib
import os
import stix2
from stix2.workbench import (
AttackPattern, Campaign, CourseOfAction, ExternalReference,
FileSystemSource, Filter, Identity, Indicator, IntrusionSet, Malware,
MarkingDefinition, ObservedData, Relationship, Report, StatementMarking,
ThreatActor, Tool, Vulnerability, add_data_source, all_versions,
attack_patterns, campaigns, courses_of_action, create, get, identities,
indicators, intrusion_sets, malware, observed_data, query, reports, save,
set_default_created, set_default_creator, set_default_external_refs,
_STIX_VID, AttackPattern, Bundle, Campaign, CourseOfAction,
ExternalReference, File, FileSystemSource, Filter, Identity, Indicator,
IntrusionSet, Malware, MarkingDefinition, NTFSExt, ObservedData,
Relationship, Report, StatementMarking, ThreatActor, Tool, Vulnerability,
add_data_source, all_versions, attack_patterns, campaigns,
courses_of_action, create, get, identities, indicators, intrusion_sets,
malware, observed_data, query, reports, save, set_default_created,
set_default_creator, set_default_external_refs,
set_default_object_marking_refs, threat_actors, tools, vulnerabilities,
)
from .constants import (
ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID, CAMPAIGN_KWARGS,
COURSE_OF_ACTION_ID, COURSE_OF_ACTION_KWARGS, IDENTITY_ID, IDENTITY_KWARGS,
INDICATOR_ID, INDICATOR_KWARGS, INTRUSION_SET_ID, INTRUSION_SET_KWARGS,
MALWARE_ID, MALWARE_KWARGS, OBSERVED_DATA_ID, OBSERVED_DATA_KWARGS,
REPORT_ID, REPORT_KWARGS, THREAT_ACTOR_ID, THREAT_ACTOR_KWARGS, TOOL_ID,
TOOL_KWARGS, VULNERABILITY_ID, VULNERABILITY_KWARGS,
# Auto-detect some settings based on the current default STIX version
_STIX_DATA_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
_STIX_VID,
"stix2_data",
)
_STIX_CONSTANTS_MODULE = "stix2.test." + _STIX_VID + ".constants"
constants = importlib.import_module(_STIX_CONSTANTS_MODULE)
def test_workbench_environment():
# Create a STIX object
ind = create(Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS)
ind = create(
Indicator, id=constants.INDICATOR_ID, **constants.INDICATOR_KWARGS
)
save(ind)
resp = get(INDICATOR_ID)
resp = get(constants.INDICATOR_ID)
assert resp['labels'][0] == 'malicious-activity'
resp = all_versions(INDICATOR_ID)
resp = all_versions(constants.INDICATOR_ID)
assert len(resp) == 1
# Search on something other than id
@ -41,176 +47,193 @@ def test_workbench_environment():
def test_workbench_get_all_attack_patterns():
mal = AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
mal = AttackPattern(
id=constants.ATTACK_PATTERN_ID, **constants.ATTACK_PATTERN_KWARGS
)
save(mal)
resp = attack_patterns()
assert len(resp) == 1
assert resp[0].id == ATTACK_PATTERN_ID
assert resp[0].id == constants.ATTACK_PATTERN_ID
def test_workbench_get_all_campaigns():
cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
cam = Campaign(id=constants.CAMPAIGN_ID, **constants.CAMPAIGN_KWARGS)
save(cam)
resp = campaigns()
assert len(resp) == 1
assert resp[0].id == CAMPAIGN_ID
assert resp[0].id == constants.CAMPAIGN_ID
def test_workbench_get_all_courses_of_action():
coa = CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS)
coa = CourseOfAction(
id=constants.COURSE_OF_ACTION_ID, **constants.COURSE_OF_ACTION_KWARGS
)
save(coa)
resp = courses_of_action()
assert len(resp) == 1
assert resp[0].id == COURSE_OF_ACTION_ID
assert resp[0].id == constants.COURSE_OF_ACTION_ID
def test_workbench_get_all_identities():
idty = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
idty = Identity(id=constants.IDENTITY_ID, **constants.IDENTITY_KWARGS)
save(idty)
resp = identities()
assert len(resp) == 1
assert resp[0].id == IDENTITY_ID
assert resp[0].id == constants.IDENTITY_ID
def test_workbench_get_all_indicators():
resp = indicators()
assert len(resp) == 1
assert resp[0].id == INDICATOR_ID
assert resp[0].id == constants.INDICATOR_ID
def test_workbench_get_all_intrusion_sets():
ins = IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS)
ins = IntrusionSet(
id=constants.INTRUSION_SET_ID, **constants.INTRUSION_SET_KWARGS
)
save(ins)
resp = intrusion_sets()
assert len(resp) == 1
assert resp[0].id == INTRUSION_SET_ID
assert resp[0].id == constants.INTRUSION_SET_ID
def test_workbench_get_all_malware():
mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
mal = Malware(id=constants.MALWARE_ID, **constants.MALWARE_KWARGS)
save(mal)
resp = malware()
assert len(resp) == 1
assert resp[0].id == MALWARE_ID
assert resp[0].id == constants.MALWARE_ID
def test_workbench_get_all_observed_data():
od = ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS)
od = ObservedData(
id=constants.OBSERVED_DATA_ID, **constants.OBSERVED_DATA_KWARGS
)
save(od)
resp = observed_data()
assert len(resp) == 1
assert resp[0].id == OBSERVED_DATA_ID
assert resp[0].id == constants.OBSERVED_DATA_ID
def test_workbench_get_all_reports():
rep = Report(id=REPORT_ID, **REPORT_KWARGS)
rep = Report(id=constants.REPORT_ID, **constants.REPORT_KWARGS)
save(rep)
resp = reports()
assert len(resp) == 1
assert resp[0].id == REPORT_ID
assert resp[0].id == constants.REPORT_ID
def test_workbench_get_all_threat_actors():
thr = ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
thr = ThreatActor(
id=constants.THREAT_ACTOR_ID, **constants.THREAT_ACTOR_KWARGS
)
save(thr)
resp = threat_actors()
assert len(resp) == 1
assert resp[0].id == THREAT_ACTOR_ID
assert resp[0].id == constants.THREAT_ACTOR_ID
def test_workbench_get_all_tools():
tool = Tool(id=TOOL_ID, **TOOL_KWARGS)
tool = Tool(id=constants.TOOL_ID, **constants.TOOL_KWARGS)
save(tool)
resp = tools()
assert len(resp) == 1
assert resp[0].id == TOOL_ID
assert resp[0].id == constants.TOOL_ID
def test_workbench_get_all_vulnerabilities():
vuln = Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
vuln = Vulnerability(
id=constants.VULNERABILITY_ID, **constants.VULNERABILITY_KWARGS
)
save(vuln)
resp = vulnerabilities()
assert len(resp) == 1
assert resp[0].id == VULNERABILITY_ID
assert resp[0].id == constants.VULNERABILITY_ID
def test_workbench_add_to_bundle():
vuln = Vulnerability(**VULNERABILITY_KWARGS)
bundle = stix2.v20.Bundle(vuln)
vuln = Vulnerability(**constants.VULNERABILITY_KWARGS)
bundle = Bundle(vuln)
assert bundle.objects[0].name == 'Heartbleed'
def test_workbench_relationships():
rel = Relationship(INDICATOR_ID, 'indicates', MALWARE_ID)
rel = Relationship(
constants.INDICATOR_ID, 'indicates', constants.MALWARE_ID,
)
save(rel)
ind = get(INDICATOR_ID)
ind = get(constants.INDICATOR_ID)
resp = ind.relationships()
assert len(resp) == 1
assert resp[0].relationship_type == 'indicates'
assert resp[0].source_ref == INDICATOR_ID
assert resp[0].target_ref == MALWARE_ID
assert resp[0].source_ref == constants.INDICATOR_ID
assert resp[0].target_ref == constants.MALWARE_ID
def test_workbench_created_by():
intset = IntrusionSet(name="Breach 123", created_by_ref=IDENTITY_ID)
intset = IntrusionSet(
name="Breach 123", created_by_ref=constants.IDENTITY_ID,
)
save(intset)
creator = intset.created_by()
assert creator.id == IDENTITY_ID
assert creator.id == constants.IDENTITY_ID
def test_workbench_related():
rel1 = Relationship(MALWARE_ID, 'targets', IDENTITY_ID)
rel2 = Relationship(CAMPAIGN_ID, 'uses', MALWARE_ID)
rel1 = Relationship(constants.MALWARE_ID, 'targets', constants.IDENTITY_ID)
rel2 = Relationship(constants.CAMPAIGN_ID, 'uses', constants.MALWARE_ID)
save([rel1, rel2])
resp = get(MALWARE_ID).related()
resp = get(constants.MALWARE_ID).related()
assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp)
assert any(x['id'] == constants.CAMPAIGN_ID for x in resp)
assert any(x['id'] == constants.INDICATOR_ID for x in resp)
assert any(x['id'] == constants.IDENTITY_ID for x in resp)
resp = get(MALWARE_ID).related(relationship_type='indicates')
resp = get(constants.MALWARE_ID).related(relationship_type='indicates')
assert len(resp) == 1
def test_workbench_related_with_filters():
malware = Malware(labels=["ransomware"], name="CryptorBit", created_by_ref=IDENTITY_ID)
rel = Relationship(malware.id, 'variant-of', MALWARE_ID)
malware = Malware(
labels=["ransomware"], name="CryptorBit", created_by_ref=constants.IDENTITY_ID,
)
rel = Relationship(malware.id, 'variant-of', constants.MALWARE_ID)
save([malware, rel])
filters = [Filter('created_by_ref', '=', IDENTITY_ID)]
resp = get(MALWARE_ID).related(filters=filters)
filters = [Filter('created_by_ref', '=', constants.IDENTITY_ID)]
resp = get(constants.MALWARE_ID).related(filters=filters)
assert len(resp) == 1
assert resp[0].name == malware.name
assert resp[0].created_by_ref == IDENTITY_ID
assert resp[0].created_by_ref == constants.IDENTITY_ID
# filters arg can also be single filter
resp = get(MALWARE_ID).related(filters=filters[0])
resp = get(constants.MALWARE_ID).related(filters=filters[0])
assert len(resp) == 1
def test_add_data_source():
fs_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
fs = FileSystemSource(fs_path)
fs = FileSystemSource(_STIX_DATA_PATH)
add_data_source(fs)
resp = tools()
assert len(resp) == 3
resp_ids = [tool.id for tool in resp]
assert TOOL_ID in resp_ids
assert constants.TOOL_ID in resp_ids
assert 'tool--03342581-f790-4f03-ba41-e82e67392e23' in resp_ids
assert 'tool--242f3da3-4425-4d11-8f5c-b842886da966' in resp_ids
@ -229,22 +252,28 @@ def test_additional_filters_list():
def test_default_creator():
set_default_creator(IDENTITY_ID)
campaign = Campaign(**CAMPAIGN_KWARGS)
set_default_creator(constants.IDENTITY_ID)
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert 'created_by_ref' not in CAMPAIGN_KWARGS
assert campaign.created_by_ref == IDENTITY_ID
assert 'created_by_ref' not in constants.CAMPAIGN_KWARGS
assert campaign.created_by_ref == constants.IDENTITY_ID
# turn off side-effects to avoid affecting future tests
set_default_creator(None)
def test_default_created_timestamp():
timestamp = "2018-03-19T01:02:03.000Z"
set_default_created(timestamp)
campaign = Campaign(**CAMPAIGN_KWARGS)
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert 'created' not in CAMPAIGN_KWARGS
assert 'created' not in constants.CAMPAIGN_KWARGS
assert stix2.utils.format_datetime(campaign.created) == timestamp
assert stix2.utils.format_datetime(campaign.modified) == timestamp
# turn off side-effects to avoid affecting future tests
set_default_created(None)
def test_default_external_refs():
ext_ref = ExternalReference(
@ -252,11 +281,14 @@ def test_default_external_refs():
description="Threat report",
)
set_default_external_refs(ext_ref)
campaign = Campaign(**CAMPAIGN_KWARGS)
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert campaign.external_references[0].source_name == "ACME Threat Intel"
assert campaign.external_references[0].description == "Threat report"
# turn off side-effects to avoid affecting future tests
set_default_external_refs([])
def test_default_object_marking_refs():
stmt_marking = StatementMarking("Copyright 2016, Example Corp")
@ -265,18 +297,21 @@ def test_default_object_marking_refs():
definition=stmt_marking,
)
set_default_object_marking_refs(mark_def)
campaign = Campaign(**CAMPAIGN_KWARGS)
campaign = Campaign(**constants.CAMPAIGN_KWARGS)
assert campaign.object_marking_refs[0] == mark_def.id
# turn off side-effects to avoid affecting future tests
set_default_object_marking_refs([])
def test_workbench_custom_property_object_in_observable_extension():
ntfs = stix2.v20.NTFSExt(
ntfs = NTFSExt(
allow_custom=True,
sid=1,
x_foo='bar',
)
artifact = stix2.v20.File(
artifact = File(
name='test',
extensions={'ntfs-ext': ntfs},
)
@ -293,7 +328,7 @@ def test_workbench_custom_property_object_in_observable_extension():
def test_workbench_custom_property_dict_in_observable_extension():
artifact = stix2.v20.File(
artifact = File(
allow_custom=True,
name='test',
extensions={

View File

@ -4,6 +4,7 @@ import pytest
import stix2
from ...exceptions import InvalidValueError
from .constants import IDENTITY_ID
EXPECTED_BUNDLE = """{
@ -156,15 +157,15 @@ def test_create_bundle_with_arg_listarg_and_kwarg(indicator, malware, relationsh
def test_create_bundle_invalid(indicator, malware, relationship):
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v20.Bundle(objects=[1])
assert excinfo.value.reason == "This property may only contain a dictionary or object"
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v20.Bundle(objects=[{}])
assert excinfo.value.reason == "This property may only contain a non-empty dictionary or object"
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v20.Bundle(objects=[{'type': 'bundle'}])
assert excinfo.value.reason == 'This property may not contain a Bundle object'
@ -175,7 +176,7 @@ def test_parse_bundle(version):
assert bundle.type == "bundle"
assert bundle.id.startswith("bundle--")
assert type(bundle.objects[0]) is stix2.v20.Indicator
assert isinstance(bundle.objects[0], stix2.v20.Indicator)
assert bundle.objects[0].type == 'indicator'
assert bundle.objects[1].type == 'malware'
assert bundle.objects[2].type == 'relationship'
@ -232,7 +233,7 @@ def test_bundle_with_different_spec_objects():
},
]
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v20.Bundle(objects=data)
assert "Spec version 2.0 bundles don't yet support containing objects of a different spec version." in str(excinfo.value)

View File

@ -74,7 +74,9 @@ def test_register_object_with_version():
v = 'v20'
assert bundle.objects[0].type in core.STIX2_OBJ_MAPS[v]['objects']
assert v in str(bundle.objects[0].__class__)
# spec_version is not in STIX 2.0, and is required in 2.1, so this
# suffices as a test for a STIX 2.0 object.
assert "spec_version" not in bundle.objects[0]
def test_register_marking_with_version():

View File

@ -2,6 +2,7 @@ import pytest
import stix2
from ...exceptions import InvalidValueError
from .constants import FAKE_TIME, IDENTITY_ID, MARKING_DEFINITION_ID
IDENTITY_CUSTOM_PROP = stix2.v20.Identity(
@ -95,7 +96,7 @@ def test_identity_custom_property_allowed():
def test_parse_identity_custom_property(data):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
stix2.parse(data, version="2.0")
assert excinfo.value.cls == stix2.v20.Identity
assert issubclass(excinfo.value.cls, stix2.v20.Identity)
assert excinfo.value.properties == ['foo']
assert "Unexpected properties for" in str(excinfo.value)
@ -133,7 +134,7 @@ def test_custom_property_dict_in_bundled_object():
'identity_class': 'individual',
'x_foo': 'bar',
}
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
with pytest.raises(InvalidValueError):
stix2.v20.Bundle(custom_identity)
bundle = stix2.v20.Bundle(custom_identity, allow_custom=True)
@ -199,7 +200,7 @@ def test_custom_property_object_in_observable_extension():
def test_custom_property_dict_in_observable_extension():
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
with pytest.raises(InvalidValueError):
stix2.v20.File(
name='test',
extensions={
@ -718,7 +719,7 @@ def test_custom_extension():
def test_custom_extension_wrong_observable_type():
# NewExtension is an extension of DomainName, not File
ext = NewExtension(property1='something')
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v20.File(
name="abc.txt",
extensions={
@ -911,7 +912,7 @@ def test_parse_observable_with_custom_extension():
],
)
def test_parse_observable_with_unregistered_custom_extension(data):
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.parse_observable(data, version='2.0')
assert "Can't parse unknown extension type" in str(excinfo.value)

View File

@ -2,7 +2,7 @@
import pytest
from stix2 import markings
from stix2.exceptions import MarkingNotFoundError
from stix2.exceptions import InvalidSelectorError, MarkingNotFoundError
from stix2.v20 import TLP_RED, Malware
from .constants import MALWARE_MORE_KWARGS as MALWARE_KWARGS_CONST
@ -179,7 +179,7 @@ def test_add_marking_mark_same_property_same_marking():
],
)
def test_add_marking_bad_selector(data, marking):
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
markings.add_markings(data, marking[0], marking[1])
@ -299,7 +299,7 @@ def test_get_markings_multiple_selectors(data):
)
def test_get_markings_bad_selector(data, selector):
"""Test bad selectors raise exception"""
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
markings.get_markings(data, selector)
@ -560,7 +560,7 @@ def test_remove_marking_bad_selector():
before = {
"description": "test description",
}
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
markings.remove_markings(before, ["marking-definition--1", "marking-definition--2"], ["title"])
@ -642,7 +642,7 @@ def test_is_marked_smoke(data):
)
def test_is_marked_invalid_selector(data, selector):
"""Test invalid selector raises an error."""
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
markings.is_marked(data, selectors=selector)
@ -836,7 +836,7 @@ def test_is_marked_positional_arguments_combinations():
def test_create_sdo_with_invalid_marking():
with pytest.raises(AssertionError) as excinfo:
with pytest.raises(InvalidSelectorError) as excinfo:
Malware(
granular_markings=[
{
@ -974,7 +974,7 @@ def test_set_marking_bad_selector(marking):
**MALWARE_KWARGS
)
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
before = markings.set_markings(before, marking[0], marking[1])
assert before == after
@ -1080,7 +1080,7 @@ def test_clear_marking_all_selectors(data):
)
def test_clear_marking_bad_selector(data, selector):
"""Test bad selector raises exception."""
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
markings.clear_markings(data, selector)

View File

@ -6,6 +6,7 @@ import pytz
import stix2
from ...exceptions import InvalidValueError
from .constants import FAKE_TIME, MALWARE_ID, MALWARE_KWARGS
EXPECTED_MALWARE = """{
@ -145,7 +146,7 @@ def test_parse_malware(data):
def test_parse_malware_invalid_labels():
data = re.compile('\\[.+\\]', re.DOTALL).sub('1', EXPECTED_MALWARE)
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.parse(data, version="2.0")
assert "Invalid value for Malware 'labels'" in str(excinfo.value)

View File

@ -4,6 +4,7 @@ import pytest
from stix2 import exceptions, markings
from stix2.v20 import TLP_AMBER, Malware
from ...exceptions import MarkingNotFoundError
from .constants import FAKE_TIME, MALWARE_ID
from .constants import MALWARE_KWARGS as MALWARE_KWARGS_CONST
from .constants import MARKING_IDS
@ -350,7 +351,7 @@ def test_remove_markings_bad_markings():
object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]],
**MALWARE_KWARGS
)
with pytest.raises(AssertionError) as excinfo:
with pytest.raises(MarkingNotFoundError) as excinfo:
markings.remove_markings(before, [MARKING_IDS[4]], None)
assert str(excinfo.value) == "Marking ['%s'] was not found in Malware!" % MARKING_IDS[4]

View File

@ -6,6 +6,7 @@ import pytz
import stix2
from ...exceptions import InvalidValueError
from .constants import IDENTITY_ID, OBSERVED_DATA_ID
OBJECTS_REGEX = re.compile('\"objects\": {(?:.*?)(?:(?:[^{]*?)|(?:{[^{]*?}))*}', re.DOTALL)
@ -239,7 +240,7 @@ def test_parse_artifact_valid(data):
)
def test_parse_artifact_invalid(data):
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
with pytest.raises(ValueError):
with pytest.raises(InvalidValueError):
stix2.parse(odata_str, version="2.0")
@ -468,11 +469,10 @@ def test_parse_email_message_with_at_least_one_error(data):
"4": "artifact",
"5": "file",
}
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.parse_observable(data, valid_refs, version='2.0')
assert excinfo.value.cls == stix2.v20.EmailMIMEComponent
assert excinfo.value.properties == ["body", "body_raw_ref"]
assert excinfo.value.cls == stix2.v20.EmailMessage
assert "At least one of the" in str(excinfo.value)
assert "must be populated" in str(excinfo.value)
@ -734,7 +734,7 @@ def test_file_example_with_NTFSExt():
def test_file_example_with_empty_NTFSExt():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v20.File(
name="abc.txt",
extensions={
@ -742,8 +742,7 @@ def test_file_example_with_empty_NTFSExt():
},
)
assert excinfo.value.cls == stix2.v20.NTFSExt
assert excinfo.value.properties == sorted(list(stix2.NTFSExt._properties.keys()))
assert excinfo.value.cls == stix2.v20.File
def test_file_example_with_PDFExt():
@ -1112,16 +1111,14 @@ def test_process_example_empty_error():
def test_process_example_empty_with_extensions():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v20.Process(
extensions={
"windows-process-ext": {},
},
)
assert excinfo.value.cls == stix2.v20.WindowsProcessExt
properties_of_extension = list(stix2.v20.WindowsProcessExt._properties.keys())
assert excinfo.value.properties == sorted(properties_of_extension)
assert excinfo.value.cls == stix2.v20.Process
def test_process_example_windows_process_ext():
@ -1144,7 +1141,7 @@ def test_process_example_windows_process_ext():
def test_process_example_windows_process_ext_empty():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v20.Process(
pid=1221,
name="gedit-bin",
@ -1153,9 +1150,7 @@ def test_process_example_windows_process_ext_empty():
},
)
assert excinfo.value.cls == stix2.v20.WindowsProcessExt
properties_of_extension = list(stix2.v20.WindowsProcessExt._properties.keys())
assert excinfo.value.properties == sorted(properties_of_extension)
assert excinfo.value.cls == stix2.v20.Process
def test_process_example_extensions_empty():
@ -1289,7 +1284,7 @@ def test_user_account_unix_account_ext_example():
def test_windows_registry_key_example():
with pytest.raises(ValueError):
with pytest.raises(InvalidValueError):
stix2.v20.WindowsRegistryValueType(
name="Foo",
data="qwerty",

View File

@ -3,7 +3,9 @@ import uuid
import pytest
import stix2
from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError
from stix2.exceptions import (
AtLeastOnePropertyError, CustomContentError, DictionaryKeyError,
)
from stix2.properties import (
BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, FloatProperty,
@ -465,23 +467,27 @@ def test_extension_property_valid():
})
@pytest.mark.parametrize(
"data", [
1,
{'foobar-ext': {
'pe_type': 'exe',
}},
],
)
def test_extension_property_invalid(data):
def test_extension_property_invalid1():
ext_prop = ExtensionsProperty(spec_version="2.0", enclosing_type='file')
with pytest.raises(ValueError):
ext_prop.clean(data)
ext_prop.clean(1)
def test_extension_property_invalid2():
ext_prop = ExtensionsProperty(spec_version="2.0", enclosing_type='file')
with pytest.raises(CustomContentError):
ext_prop.clean(
{
'foobar-ext': {
'pe_type': 'exe',
},
},
)
def test_extension_property_invalid_type():
ext_prop = ExtensionsProperty(spec_version="2.0", enclosing_type='indicator')
with pytest.raises(ValueError) as excinfo:
with pytest.raises(CustomContentError) as excinfo:
ext_prop.clean(
{
'windows-pebinary-ext': {

View File

@ -71,6 +71,7 @@ def stix_objs1():
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -84,6 +85,7 @@ def stix_objs1():
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -97,6 +99,7 @@ def stix_objs1():
"modified": "2017-01-27T13:49:53.936Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -110,6 +113,7 @@ def stix_objs1():
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -123,6 +127,7 @@ def stix_objs1():
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -140,6 +145,7 @@ def stix_objs2():
],
"modified": "2017-01-31T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern_type": "stix",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"spec_version": "2.1",
"type": "indicator",
@ -153,6 +159,7 @@ def stix_objs2():
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern_type": "stix",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"spec_version": "2.1",
"type": "indicator",
@ -166,6 +173,7 @@ def stix_objs2():
],
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern_type": "stix",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"spec_version": "2.1",
"type": "indicator",

View File

@ -75,6 +75,10 @@ COURSE_OF_ACTION_KWARGS = dict(
GROUPING_KWARGS = dict(
name="Harry Potter and the Leet Hackers",
context="suspicious-activity",
object_refs=[
"malware--c8d2fae5-7271-400c-b81d-931a4caf20b9",
"identity--988145ed-a3b4-4421-b7a7-273376be67ce",
],
)
IDENTITY_KWARGS = dict(
@ -84,6 +88,7 @@ IDENTITY_KWARGS = dict(
INDICATOR_KWARGS = dict(
indicator_types=['malicious-activity'],
pattern_type="stix",
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
valid_from="2017-01-01T12:34:56Z",
)

View File

@ -4,6 +4,7 @@ import pytest
import stix2
from ...exceptions import InvalidValueError
from .constants import IDENTITY_ID
EXPECTED_BUNDLE = """{
@ -20,6 +21,7 @@ EXPECTED_BUNDLE = """{
"malicious-activity"
],
"pattern": "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
"pattern_type": "stix",
"valid_from": "2017-01-01T12:34:56Z"
},
{
@ -58,6 +60,7 @@ EXPECTED_BUNDLE_DICT = {
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"pattern": "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
"pattern_type": "stix",
"valid_from": "2017-01-01T12:34:56Z",
"indicator_types": [
"malicious-activity",
@ -162,15 +165,15 @@ def test_create_bundle_with_arg_listarg_and_kwarg(indicator, malware, relationsh
def test_create_bundle_invalid(indicator, malware, relationship):
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v21.Bundle(objects=[1])
assert excinfo.value.reason == "This property may only contain a dictionary or object"
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v21.Bundle(objects=[{}])
assert excinfo.value.reason == "This property may only contain a non-empty dictionary or object"
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v21.Bundle(objects=[{'type': 'bundle'}])
assert excinfo.value.reason == 'This property may not contain a Bundle object'
@ -181,7 +184,7 @@ def test_parse_bundle(version):
assert bundle.type == "bundle"
assert bundle.id.startswith("bundle--")
assert type(bundle.objects[0]) is stix2.v21.Indicator
assert isinstance(bundle.objects[0], stix2.v21.Indicator)
assert bundle.objects[0].type == 'indicator'
assert bundle.objects[1].type == 'malware'
assert bundle.objects[2].type == 'relationship'
@ -234,6 +237,7 @@ def test_bundle_obj_id_found():
"malicious-activity",
],
"pattern": "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
"pattern_type": "stix",
"valid_from": "2017-01-01T12:34:56Z",
},
{

View File

@ -16,6 +16,7 @@ BUNDLE = {
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"pattern": "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
"pattern_type": "stix",
"valid_from": "2017-01-01T12:34:56Z",
"indicator_types": [
"malicious-activity",
@ -78,7 +79,7 @@ def test_register_object_with_version():
v = 'v21'
assert bundle.objects[0].type in core.STIX2_OBJ_MAPS[v]['objects']
assert v in str(bundle.objects[0].__class__)
assert bundle.objects[0].spec_version == "2.1"
def test_register_marking_with_version():

View File

@ -3,6 +3,7 @@ import pytest
import stix2
import stix2.base
from ...exceptions import InvalidValueError
from .constants import FAKE_TIME, IDENTITY_ID, MARKING_DEFINITION_ID
IDENTITY_CUSTOM_PROP = stix2.v21.Identity(
@ -97,7 +98,7 @@ def test_identity_custom_property_allowed():
def test_parse_identity_custom_property(data):
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
stix2.parse(data, version="2.1")
assert excinfo.value.cls == stix2.v21.Identity
assert issubclass(excinfo.value.cls, stix2.v21.Identity)
assert excinfo.value.properties == ['foo']
assert "Unexpected properties for" in str(excinfo.value)
@ -136,7 +137,7 @@ def test_custom_property_dict_in_bundled_object():
'identity_class': 'individual',
'x_foo': 'bar',
}
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
with pytest.raises(InvalidValueError):
stix2.v21.Bundle(custom_identity)
bundle = stix2.v21.Bundle(custom_identity, allow_custom=True)
@ -203,7 +204,7 @@ def test_custom_property_object_in_observable_extension():
def test_custom_property_dict_in_observable_extension():
with pytest.raises(stix2.exceptions.ExtraPropertiesError):
with pytest.raises(InvalidValueError):
stix2.v21.File(
name='test',
extensions={
@ -722,7 +723,7 @@ def test_custom_extension():
def test_custom_extension_wrong_observable_type():
# NewExtension is an extension of DomainName, not File
ext = NewExtension(property1='something')
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.v21.File(
name="abc.txt",
extensions={
@ -915,7 +916,7 @@ def test_parse_observable_with_custom_extension():
],
)
def test_parse_observable_with_unregistered_custom_extension(data):
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.parse_observable(data, version='2.1')
assert "Can't parse unknown extension type" in str(excinfo.value)

View File

@ -29,6 +29,7 @@ stix_objs = [
"modified": "2014-05-08T09:00:00.000Z",
"name": "File hash for Poison Ivy variant",
"pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2014-05-08T09:00:00.000000Z",

View File

@ -24,6 +24,7 @@ IND1 = {
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -37,6 +38,7 @@ IND2 = {
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -50,6 +52,7 @@ IND3 = {
"modified": "2017-01-27T13:49:53.936Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -63,6 +66,7 @@ IND4 = {
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -76,6 +80,7 @@ IND5 = {
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -89,6 +94,7 @@ IND6 = {
"modified": "2017-01-31T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -102,6 +108,7 @@ IND7 = {
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",
@ -115,6 +122,7 @@ IND8 = {
"modified": "2017-01-27T13:49:53.935Z",
"name": "Malicious site hosting downloader",
"pattern": "[url:value = 'http://x4z9arb.cn/4712']",
"pattern_type": "stix",
"spec_version": "2.1",
"type": "indicator",
"valid_from": "2017-01-27T13:49:53.935382Z",

View File

@ -1,7 +1,7 @@
import pytest
from stix2 import markings
from stix2.exceptions import MarkingNotFoundError
from stix2.exceptions import InvalidSelectorError, MarkingNotFoundError
from stix2.v21 import TLP_RED, Malware
from .constants import MALWARE_MORE_KWARGS as MALWARE_KWARGS_CONST
@ -209,7 +209,7 @@ def test_add_marking_mark_same_property_same_marking():
],
)
def test_add_marking_bad_selector(data, marking):
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
markings.add_markings(data, marking[0], marking[1])
@ -329,7 +329,7 @@ def test_get_markings_multiple_selectors(data):
)
def test_get_markings_bad_selector(data, selector):
"""Test bad selectors raise exception"""
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
markings.get_markings(data, selector)
@ -714,7 +714,7 @@ def test_remove_marking_bad_selector():
before = {
"description": "test description",
}
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
markings.remove_markings(before, ["marking-definition--1", "marking-definition--2"], ["title"])
@ -805,7 +805,7 @@ def test_is_marked_smoke(data):
)
def test_is_marked_invalid_selector(data, selector):
"""Test invalid selector raises an error."""
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
markings.is_marked(data, selectors=selector)
@ -1000,7 +1000,7 @@ def test_is_marked_positional_arguments_combinations():
def test_create_sdo_with_invalid_marking():
with pytest.raises(AssertionError) as excinfo:
with pytest.raises(InvalidSelectorError) as excinfo:
Malware(
granular_markings=[
{
@ -1192,7 +1192,7 @@ def test_set_marking_bad_selector(marking):
**MALWARE_KWARGS
)
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
before = markings.set_markings(before, marking[0], marking[1])
assert before == after
@ -1298,7 +1298,7 @@ def test_clear_marking_all_selectors(data):
)
def test_clear_marking_bad_selector(data, selector):
"""Test bad selector raises exception."""
with pytest.raises(AssertionError):
with pytest.raises(InvalidSelectorError):
markings.clear_markings(data, selector)

View File

@ -14,7 +14,11 @@ EXPECTED_GROUPING = """{
"created": "2017-01-01T12:34:56.000Z",
"modified": "2017-01-01T12:34:56.000Z",
"name": "Harry Potter and the Leet Hackers",
"context": "suspicious-activity"
"context": "suspicious-activity",
"object_refs": [
"malware--c8d2fae5-7271-400c-b81d-931a4caf20b9",
"identity--988145ed-a3b4-4421-b7a7-273376be67ce"
]
}"""
@ -28,6 +32,10 @@ def test_grouping_with_all_required_properties():
modified=now,
name="Harry Potter and the Leet Hackers",
context="suspicious-activity",
object_refs=[
"malware--c8d2fae5-7271-400c-b81d-931a4caf20b9",
"identity--988145ed-a3b4-4421-b7a7-273376be67ce",
],
)
assert str(grp) == EXPECTED_GROUPING
@ -74,7 +82,7 @@ def test_grouping_required_properties():
stix2.v21.Grouping()
assert excinfo.value.cls == stix2.v21.Grouping
assert excinfo.value.properties == ["context"]
assert excinfo.value.properties == ["context", "object_refs"]
def test_invalid_kwarg_to_grouping():
@ -97,6 +105,10 @@ def test_invalid_kwarg_to_grouping():
"modified": "2017-01-01T12:34:56.000Z",
"name": "Harry Potter and the Leet Hackers",
"context": "suspicious-activity",
"object_refs": [
"malware--c8d2fae5-7271-400c-b81d-931a4caf20b9",
"identity--988145ed-a3b4-4421-b7a7-273376be67ce",
],
},
],
)
@ -110,3 +122,7 @@ def test_parse_grouping(data):
assert grp.modified == dt.datetime(2017, 1, 1, 12, 34, 56, tzinfo=pytz.utc)
assert grp.name == "Harry Potter and the Leet Hackers"
assert grp.context == "suspicious-activity"
assert grp.object_refs == [
"malware--c8d2fae5-7271-400c-b81d-931a4caf20b9",
"identity--988145ed-a3b4-4421-b7a7-273376be67ce",
]

View File

@ -18,6 +18,7 @@ EXPECTED_INDICATOR = """{
"malicious-activity"
],
"pattern": "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
"pattern_type": "stix",
"valid_from": "1970-01-01T00:00:01Z"
}"""
@ -29,6 +30,7 @@ EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join("""
modified='2017-01-01T00:00:01.000Z',
indicator_types=['malicious-activity'],
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
pattern_type='stix',
valid_from='1970-01-01T00:00:01Z'
""".split()) + ")"
@ -43,6 +45,7 @@ def test_indicator_with_all_required_properties():
created=now,
modified=now,
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
pattern_type="stix",
valid_from=epoch,
indicator_types=['malicious-activity'],
)
@ -98,8 +101,8 @@ def test_indicator_required_properties():
stix2.v21.Indicator()
assert excinfo.value.cls == stix2.v21.Indicator
assert excinfo.value.properties == ["indicator_types", "pattern", "valid_from"]
assert str(excinfo.value) == "No values for required properties for Indicator: (indicator_types, pattern, valid_from)."
assert excinfo.value.properties == ["indicator_types", "pattern", "pattern_type", "valid_from"]
assert str(excinfo.value) == "No values for required properties for Indicator: (indicator_types, pattern, pattern_type, valid_from)."
def test_indicator_required_property_pattern():
@ -107,7 +110,7 @@ def test_indicator_required_property_pattern():
stix2.v21.Indicator(indicator_types=['malicious-activity'])
assert excinfo.value.cls == stix2.v21.Indicator
assert excinfo.value.properties == ["pattern", "valid_from"]
assert excinfo.value.properties == ["pattern", "pattern_type", "valid_from"]
def test_indicator_created_ref_invalid_format():
@ -162,6 +165,7 @@ def test_created_modified_time_are_identical_by_default():
"malicious-activity",
],
"pattern": "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
"pattern_type": "stix",
"valid_from": "1970-01-01T00:00:01Z",
},
],
@ -184,6 +188,7 @@ def test_invalid_indicator_pattern():
stix2.v21.Indicator(
indicator_types=['malicious-activity'],
pattern="file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e'",
pattern_type="stix",
valid_from="2017-01-01T12:34:56Z",
)
assert excinfo.value.cls == stix2.v21.Indicator
@ -194,6 +199,7 @@ def test_invalid_indicator_pattern():
stix2.v21.Indicator(
indicator_types=['malicious-activity'],
pattern='[file:hashes.MD5 = "d41d8cd98f00b204e9800998ecf8427e"]',
pattern_type="stix",
valid_from="2017-01-01T12:34:56Z",
)
assert excinfo.value.cls == stix2.v21.Indicator

View File

@ -71,3 +71,18 @@ def test_language_content_campaign():
# or https://docs.python.org/2/library/json.html#json.dumps
assert lc.serialize(pretty=True, ensure_ascii=False) == TEST_LANGUAGE_CONTENT
assert lc.modified == camp.modified
def test_object_modified_optional():
"""
object_modified is now optional in STIX 2.1.
"""
stix2.v21.LanguageContent(
object_ref=CAMPAIGN_ID,
contents={
"en": {
"name": "the english text",
},
},
)

View File

@ -5,6 +5,7 @@ import pytest
import pytz
import stix2
import stix2.exceptions
from .constants import LOCATION_ID
@ -111,7 +112,7 @@ def test_parse_location(data):
],
)
def test_location_bad_latitude(data):
with pytest.raises(ValueError) as excinfo:
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.parse(data)
assert "Invalid value for Location 'latitude'" in str(excinfo.value)
@ -140,7 +141,7 @@ def test_location_bad_latitude(data):
],
)
def test_location_bad_longitude(data):
with pytest.raises(ValueError) as excinfo:
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.parse(data)
assert "Invalid value for Location 'longitude'" in str(excinfo.value)
@ -190,7 +191,7 @@ def test_location_properties_missing_when_precision_is_present(data):
],
)
def test_location_negative_precision(data):
with pytest.raises(ValueError) as excinfo:
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.parse(data)
assert "Invalid value for Location 'precision'" in str(excinfo.value)
@ -264,6 +265,15 @@ def test_location_lat_or_lon_dependency_missing(data, msg):
assert msg in str(excinfo.value)
def test_location_complex_presence_constraint():
with pytest.raises(stix2.exceptions.PropertyPresenceError):
stix2.parse({
"type": "location",
"spec_version": "2.1",
"id": LOCATION_ID,
})
def test_google_map_url_long_lat_provided():
expected_url = "https://www.google.com/maps/search/?api=1&query=41.862401%2C-87.616001"

View File

@ -6,6 +6,7 @@ import pytz
import stix2
from ...exceptions import InvalidValueError, PropertyPresenceError
from .constants import FAKE_TIME, MALWARE_ID, MALWARE_KWARGS
EXPECTED_MALWARE = """{
@ -79,7 +80,7 @@ def test_malware_required_properties():
stix2.v21.Malware()
assert excinfo.value.cls == stix2.v21.Malware
assert excinfo.value.properties == ["is_family", "malware_types", "name"]
assert excinfo.value.properties == ["is_family", "malware_types"]
def test_malware_required_property_name():
@ -87,7 +88,7 @@ def test_malware_required_property_name():
stix2.v21.Malware(malware_types=['ransomware'])
assert excinfo.value.cls == stix2.v21.Malware
assert excinfo.value.properties == ["is_family", "name"]
assert excinfo.value.properties == ["is_family"]
def test_cannot_assign_to_malware_attributes(malware):
@ -136,7 +137,7 @@ def test_parse_malware(data):
def test_parse_malware_invalid_labels():
data = re.compile('\\[.+\\]', re.DOTALL).sub('1', EXPECTED_MALWARE)
with pytest.raises(ValueError) as excinfo:
with pytest.raises(InvalidValueError) as excinfo:
stix2.parse(data)
assert "Invalid value for Malware 'malware_types'" in str(excinfo.value)
@ -175,3 +176,24 @@ def test_malware_invalid_last_before_first():
stix2.v21.Malware(first_seen="2017-01-01T12:34:56.000Z", last_seen="2017-01-01T12:33:56.000Z", **MALWARE_KWARGS)
assert "'last_seen' must be greater than or equal to 'first_seen'" in str(excinfo.value)
def test_malware_family_no_name():
with pytest.raises(PropertyPresenceError):
stix2.parse({
"type": "malware",
"id": MALWARE_ID,
"spec_version": "2.1",
"is_family": True,
"malware_types": ["a type"],
})
def test_malware_non_family_no_name():
stix2.parse({
"type": "malware",
"id": MALWARE_ID,
"spec_version": "2.1",
"is_family": False,
"malware_types": ["something"],
})

View File

@ -26,7 +26,9 @@ MALWARE_ANALYSIS_JSON = """{
"software--46a6a91d-1160-4867-a4d1-b14e080e4e5b"
],
"configuration_version": "1.7",
"module": "Super Analyzer",
"modules": [
"Super Analyzer"
],
"analysis_engine_version": "1.2",
"analysis_definition_version": "3.4",
"submitted": "2018-11-23T06:45:55.747Z",

View File

@ -3,6 +3,7 @@ import pytest
from stix2 import exceptions, markings
from stix2.v21 import TLP_AMBER, Malware
from ...exceptions import MarkingNotFoundError
from .constants import FAKE_TIME, MALWARE_ID
from .constants import MALWARE_KWARGS as MALWARE_KWARGS_CONST
from .constants import MARKING_IDS
@ -349,7 +350,7 @@ def test_remove_markings_bad_markings():
object_marking_refs=[MARKING_IDS[0], MARKING_IDS[1], MARKING_IDS[2]],
**MALWARE_KWARGS
)
with pytest.raises(AssertionError) as excinfo:
with pytest.raises(MarkingNotFoundError) as excinfo:
markings.remove_markings(before, [MARKING_IDS[4]], None)
assert str(excinfo.value) == "Marking ['%s'] was not found in Malware!" % MARKING_IDS[4]

View File

@ -305,7 +305,7 @@ def test_parse_artifact_valid(data):
)
def test_parse_artifact_invalid(data):
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.InvalidValueError):
stix2.parse(odata_str, version="2.1")
@ -534,11 +534,10 @@ def test_parse_email_message_with_at_least_one_error(data):
"4": "artifact",
"5": "file",
}
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.parse_observable(data, valid_refs, version='2.1')
assert excinfo.value.cls == stix2.v21.EmailMIMEComponent
assert excinfo.value.properties == ["body", "body_raw_ref"]
assert excinfo.value.cls == stix2.v21.EmailMessage
assert "At least one of the" in str(excinfo.value)
assert "must be populated" in str(excinfo.value)
@ -788,7 +787,7 @@ def test_file_example_with_NTFSExt():
def test_file_example_with_empty_NTFSExt():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.v21.File(
name="abc.txt",
extensions={
@ -796,8 +795,7 @@ def test_file_example_with_empty_NTFSExt():
},
)
assert excinfo.value.cls == stix2.v21.NTFSExt
assert excinfo.value.properties == sorted(list(stix2.NTFSExt._properties.keys()))
assert excinfo.value.cls == stix2.v21.File
def test_file_example_with_PDFExt():
@ -1152,14 +1150,12 @@ def test_process_example_empty_error():
def test_process_example_empty_with_extensions():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.v21.Process(extensions={
"windows-process-ext": {},
})
assert excinfo.value.cls == stix2.v21.WindowsProcessExt
properties_of_extension = list(stix2.v21.WindowsProcessExt._properties.keys())
assert excinfo.value.properties == sorted(properties_of_extension)
assert excinfo.value.cls == stix2.v21.Process
def test_process_example_windows_process_ext():
@ -1181,7 +1177,7 @@ def test_process_example_windows_process_ext():
def test_process_example_windows_process_ext_empty():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.v21.Process(
pid=1221,
extensions={
@ -1189,9 +1185,7 @@ def test_process_example_windows_process_ext_empty():
},
)
assert excinfo.value.cls == stix2.v21.WindowsProcessExt
properties_of_extension = list(stix2.v21.WindowsProcessExt._properties.keys())
assert excinfo.value.properties == sorted(properties_of_extension)
assert excinfo.value.cls == stix2.v21.Process
def test_process_example_extensions_empty():
@ -1324,7 +1318,7 @@ def test_user_account_unix_account_ext_example():
def test_windows_registry_key_example():
with pytest.raises(ValueError):
with pytest.raises(stix2.exceptions.InvalidValueError):
stix2.v21.WindowsRegistryValueType(
name="Foo",
data="qwerty",
@ -1381,3 +1375,19 @@ def test_new_version_with_related_objects():
new_version = data.new_version(last_observed="2017-12-12T12:00:00Z")
assert new_version.last_observed.year == 2017
assert new_version.objects['domain'].resolves_to_refs[0] == 'src_ip'
def test_objects_deprecation():
with pytest.warns(stix2.exceptions.STIXDeprecationWarning):
stix2.v21.ObservedData(
first_observed="2016-03-12T12:00:00Z",
last_observed="2016-03-12T12:00:00Z",
number_observed=1,
objects={
"0": {
"type": "file",
"name": "foo",
},
},
)

View File

@ -23,10 +23,10 @@ EXPECTED_OPINION = """{
"created": "2016-05-12T08:17:27.000Z",
"modified": "2016-05-12T08:17:27.000Z",
"explanation": "%s",
"opinion": "strongly-disagree",
"object_refs": [
"relationship--16d2358f-3b0d-4c88-b047-0da2f7ed4471"
],
"opinion": "strongly-disagree"
]
}""" % EXPLANATION
EXPECTED_OPINION_REPR = "Opinion(" + " ".join((
@ -37,8 +37,9 @@ EXPECTED_OPINION_REPR = "Opinion(" + " ".join((
created='2016-05-12T08:17:27.000Z',
modified='2016-05-12T08:17:27.000Z',
explanation="%s",
object_refs=['relationship--16d2358f-3b0d-4c88-b047-0da2f7ed4471'],
opinion='strongly-disagree'""" % EXPLANATION
opinion='strongly-disagree',
object_refs=['relationship--16d2358f-3b0d-4c88-b047-0da2f7ed4471']
""" % EXPLANATION
).split()) + ")"

View File

@ -1,7 +1,9 @@
import pytest
import stix2
from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError
from stix2.exceptions import (
AtLeastOnePropertyError, CustomContentError, DictionaryKeyError,
)
from stix2.properties import (
BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, FloatProperty,
@ -474,23 +476,27 @@ def test_extension_property_valid():
})
@pytest.mark.parametrize(
"data", [
1,
{'foobar-ext': {
'pe_type': 'exe',
}},
],
)
def test_extension_property_invalid(data):
def test_extension_property_invalid1():
ext_prop = ExtensionsProperty(spec_version='2.1', enclosing_type='file')
with pytest.raises(ValueError):
ext_prop.clean(data)
ext_prop.clean(1)
def test_extension_property_invalid2():
ext_prop = ExtensionsProperty(spec_version='2.1', enclosing_type='file')
with pytest.raises(CustomContentError):
ext_prop.clean(
{
'foobar-ext': {
'pe_type': 'exe',
},
},
)
def test_extension_property_invalid_type():
ext_prop = ExtensionsProperty(spec_version='2.1', enclosing_type='indicator')
with pytest.raises(ValueError) as excinfo:
with pytest.raises(CustomContentError) as excinfo:
ext_prop.clean(
{
'windows-pebinary-ext': {

View File

@ -4,6 +4,7 @@ import pytest
import pytz
import stix2
import stix2.v21
from .constants import IDENTITY_ID, THREAT_ACTOR_ID
@ -67,4 +68,26 @@ def test_parse_threat_actor(data):
assert actor.name == "Evil Org"
assert actor.threat_actor_types == ["crime-syndicate"]
def test_seen_ordering_constraint():
"""
Test first_seen/last_seen value co-constraint.
"""
with pytest.raises(ValueError):
stix2.v21.ThreatActor(
name="Bad Person",
threat_actor_types=["bad person", "evil person"],
first_seen="2010-04-21T09:31:11Z",
last_seen="2009-02-06T03:39:31Z",
)
# equal timestamps is okay.
stix2.v21.ThreatActor(
name="Bad Person",
threat_actor_types=["bad person", "evil person"],
first_seen="2010-04-21T09:31:11Z",
last_seen="2010-04-21T09:31:11Z",
)
# TODO: Add other examples

View File

@ -1,331 +0,0 @@
import os
import pytest
import stix2
from stix2.workbench import (
AttackPattern, Campaign, CourseOfAction, ExternalReference,
FileSystemSource, Filter, Identity, Indicator, IntrusionSet, Malware,
MarkingDefinition, ObservedData, Relationship, Report, StatementMarking,
ThreatActor, Tool, Vulnerability, add_data_source, all_versions,
attack_patterns, campaigns, courses_of_action, create, get, identities,
indicators, intrusion_sets, malware, observed_data, query, reports, save,
set_default_created, set_default_creator, set_default_external_refs,
set_default_object_marking_refs, threat_actors, tools, vulnerabilities,
)
from .constants import (
ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID, CAMPAIGN_KWARGS,
COURSE_OF_ACTION_ID, COURSE_OF_ACTION_KWARGS, IDENTITY_ID, IDENTITY_KWARGS,
INDICATOR_ID, INDICATOR_KWARGS, INTRUSION_SET_ID, INTRUSION_SET_KWARGS,
MALWARE_ID, MALWARE_KWARGS, OBSERVED_DATA_ID, OBSERVED_DATA_KWARGS,
REPORT_ID, REPORT_KWARGS, THREAT_ACTOR_ID, THREAT_ACTOR_KWARGS, TOOL_ID,
TOOL_KWARGS, VULNERABILITY_ID, VULNERABILITY_KWARGS,
)
@pytest.mark.xfail(reason='The workbench is not working correctly for 2.1')
def test_workbench_environment():
# Create a STIX object
ind = create(Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS)
save(ind)
resp = get(INDICATOR_ID)
assert resp['indicator_types'][0] == 'malicious-activity'
resp = all_versions(INDICATOR_ID)
assert len(resp) == 1
# Search on something other than id
q = [Filter('type', '=', 'vulnerability')]
resp = query(q)
assert len(resp) == 0
def test_workbench_get_all_attack_patterns():
mal = AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
save(mal)
resp = attack_patterns()
assert len(resp) == 1
assert resp[0].id == ATTACK_PATTERN_ID
def test_workbench_get_all_campaigns():
cam = Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
save(cam)
resp = campaigns()
assert len(resp) == 1
assert resp[0].id == CAMPAIGN_ID
def test_workbench_get_all_courses_of_action():
coa = CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS)
save(coa)
resp = courses_of_action()
assert len(resp) == 1
assert resp[0].id == COURSE_OF_ACTION_ID
def test_workbench_get_all_identities():
idty = Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
save(idty)
resp = identities()
assert len(resp) == 1
assert resp[0].id == IDENTITY_ID
@pytest.mark.xfail(reason='The workbench is not working correctly for 2.1')
def test_workbench_get_all_indicators():
resp = indicators()
assert len(resp) == 1
assert resp[0].id == INDICATOR_ID
def test_workbench_get_all_intrusion_sets():
ins = IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS)
save(ins)
resp = intrusion_sets()
assert len(resp) == 1
assert resp[0].id == INTRUSION_SET_ID
@pytest.mark.xfail(reason='The workbench is not working correctly for 2.1')
def test_workbench_get_all_malware():
mal = Malware(id=MALWARE_ID, **MALWARE_KWARGS)
save(mal)
resp = malware()
assert len(resp) == 1
assert resp[0].id == MALWARE_ID
def test_workbench_get_all_observed_data():
od = ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS)
save(od)
resp = observed_data()
assert len(resp) == 1
assert resp[0].id == OBSERVED_DATA_ID
@pytest.mark.xfail(reason='The workbench is not working correctly for 2.1')
def test_workbench_get_all_reports():
rep = Report(id=REPORT_ID, **REPORT_KWARGS)
save(rep)
resp = reports()
assert len(resp) == 1
assert resp[0].id == REPORT_ID
@pytest.mark.xfail(reason='The workbench is not working correctly for 2.1')
def test_workbench_get_all_threat_actors():
thr = ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
save(thr)
resp = threat_actors()
assert len(resp) == 1
assert resp[0].id == THREAT_ACTOR_ID
@pytest.mark.xfail(reason='The workbench is not working correctly for 2.1')
def test_workbench_get_all_tools():
tool = Tool(id=TOOL_ID, **TOOL_KWARGS)
save(tool)
resp = tools()
assert len(resp) == 1
assert resp[0].id == TOOL_ID
def test_workbench_get_all_vulnerabilities():
vuln = Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
save(vuln)
resp = vulnerabilities()
assert len(resp) == 1
assert resp[0].id == VULNERABILITY_ID
def test_workbench_add_to_bundle():
vuln = Vulnerability(**VULNERABILITY_KWARGS)
bundle = stix2.v21.Bundle(vuln)
assert bundle.objects[0].name == 'Heartbleed'
@pytest.mark.xfail(reason='The workbench is not working correctly for 2.1')
def test_workbench_relationships():
rel = Relationship(INDICATOR_ID, 'indicates', MALWARE_ID)
save(rel)
ind = get(INDICATOR_ID)
resp = ind.relationships()
assert len(resp) == 1
assert resp[0].relationship_type == 'indicates'
assert resp[0].source_ref == INDICATOR_ID
assert resp[0].target_ref == MALWARE_ID
def test_workbench_created_by():
intset = IntrusionSet(name="Breach 123", created_by_ref=IDENTITY_ID)
save(intset)
creator = intset.created_by()
assert creator.id == IDENTITY_ID
@pytest.mark.xfail(reason='The workbench is not working correctly for 2.1')
def test_workbench_related():
rel1 = Relationship(MALWARE_ID, 'targets', IDENTITY_ID)
rel2 = Relationship(CAMPAIGN_ID, 'uses', MALWARE_ID)
save([rel1, rel2])
resp = get(MALWARE_ID).related()
assert len(resp) == 3
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
assert any(x['id'] == IDENTITY_ID for x in resp)
resp = get(MALWARE_ID).related(relationship_type='indicates')
assert len(resp) == 1
@pytest.mark.xfail(reason='The workbench is not working correctly for 2.1')
def test_workbench_related_with_filters():
malware = Malware(
malware_types=["ransomware"], name="CryptorBit",
created_by_ref=IDENTITY_ID, is_family=False,
)
rel = Relationship(malware.id, 'variant-of', MALWARE_ID)
save([malware, rel])
filters = [Filter('created_by_ref', '=', IDENTITY_ID)]
resp = get(MALWARE_ID).related(filters=filters)
assert len(resp) == 1
assert resp[0].name == malware.name
assert resp[0].created_by_ref == IDENTITY_ID
# filters arg can also be single filter
resp = get(MALWARE_ID).related(filters=filters[0])
assert len(resp) == 1
@pytest.mark.xfail(reason='The workbench is not working correctly for 2.1')
def test_add_data_source():
fs_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
fs = FileSystemSource(fs_path)
add_data_source(fs)
resp = tools()
assert len(resp) == 3
resp_ids = [tool.id for tool in resp]
assert TOOL_ID in resp_ids
assert 'tool--03342581-f790-4f03-ba41-e82e67392e23' in resp_ids
assert 'tool--242f3da3-4425-4d11-8f5c-b842886da966' in resp_ids
def test_additional_filter():
resp = tools(Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'))
assert len(resp) == 2
def test_additional_filters_list():
resp = tools([
Filter('created_by_ref', '=', 'identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5'),
Filter('name', '=', 'Windows Credential Editor'),
])
assert len(resp) == 1
def test_default_creator():
set_default_creator(IDENTITY_ID)
campaign = Campaign(**CAMPAIGN_KWARGS)
assert 'created_by_ref' not in CAMPAIGN_KWARGS
assert campaign.created_by_ref == IDENTITY_ID
def test_default_created_timestamp():
timestamp = "2018-03-19T01:02:03.000Z"
set_default_created(timestamp)
campaign = Campaign(**CAMPAIGN_KWARGS)
assert 'created' not in CAMPAIGN_KWARGS
assert stix2.utils.format_datetime(campaign.created) == timestamp
assert stix2.utils.format_datetime(campaign.modified) == timestamp
def test_default_external_refs():
ext_ref = ExternalReference(
source_name="ACME Threat Intel",
description="Threat report",
)
set_default_external_refs(ext_ref)
campaign = Campaign(**CAMPAIGN_KWARGS)
assert campaign.external_references[0].source_name == "ACME Threat Intel"
assert campaign.external_references[0].description == "Threat report"
def test_default_object_marking_refs():
stmt_marking = StatementMarking("Copyright 2016, Example Corp")
mark_def = MarkingDefinition(
definition_type="statement",
definition=stmt_marking,
)
set_default_object_marking_refs(mark_def)
campaign = Campaign(**CAMPAIGN_KWARGS)
assert campaign.object_marking_refs[0] == mark_def.id
def test_workbench_custom_property_object_in_observable_extension():
ntfs = stix2.v21.NTFSExt(
allow_custom=True,
sid=1,
x_foo='bar',
)
artifact = stix2.v21.File(
name='test',
extensions={'ntfs-ext': ntfs},
)
observed_data = ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=1,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)
def test_workbench_custom_property_dict_in_observable_extension():
artifact = stix2.v21.File(
allow_custom=True,
name='test',
extensions={
'ntfs-ext': {
'allow_custom': True,
'sid': 1,
'x_foo': 'bar',
},
},
)
observed_data = ObservedData(
allow_custom=True,
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=1,
objects={"0": artifact},
)
assert observed_data.objects['0'].extensions['ntfs-ext'].x_foo == "bar"
assert '"x_foo": "bar"' in str(observed_data)

View File

@ -79,7 +79,7 @@ class LanguageContent(_STIXBase):
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('object_ref', ReferenceProperty(spec_version='2.1', required=True)),
# TODO: 'object_modified' it MUST be an exact match for the modified time of the STIX Object (SRO or SDO) being referenced.
('object_modified', TimestampProperty(required=True, precision='millisecond')),
('object_modified', TimestampProperty(precision='millisecond')),
# TODO: 'contents' https://docs.google.com/document/d/1ShNq4c3e1CkfANmD9O--mdZ5H0O_GLnjN28a_yrEaco/edit#heading=h.cfz5hcantmvx
('contents', DictionaryProperty(spec_version='2.1', required=True)),
('revoked', BooleanProperty()),

View File

@ -2,11 +2,13 @@
from collections import OrderedDict
import itertools
import warnings
from six.moves.urllib.parse import quote_plus
from ..core import STIXDomainObject
from ..custom import _custom_object_builder
from ..exceptions import PropertyPresenceError, STIXDeprecationWarning
from ..properties import (
BinaryProperty, BooleanProperty, EmbeddedObjectProperty, EnumProperty,
FloatProperty, IDProperty, IntegerProperty, ListProperty,
@ -33,6 +35,7 @@ class AttackPattern(STIXDomainObject):
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('description', StringProperty()),
('aliases', ListProperty(StringProperty)),
('kill_chain_phases', ListProperty(KillChainPhase)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
@ -74,7 +77,7 @@ class Campaign(STIXDomainObject):
])
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
super(Campaign, self)._check_object_constraints()
first_seen = self.get('first_seen')
last_seen = self.get('last_seen')
@ -146,7 +149,7 @@ class Grouping(STIXDomainObject):
('name', StringProperty()),
('description', StringProperty()),
('context', StringProperty(required=True)),
('object_refs', ListProperty(ReferenceProperty)),
('object_refs', ListProperty(ReferenceProperty, required=True)),
])
@ -198,6 +201,8 @@ class Indicator(STIXDomainObject):
('description', StringProperty()),
('indicator_types', ListProperty(StringProperty, required=True)),
('pattern', PatternProperty(required=True)),
('pattern_type', StringProperty(required=True)),
('pattern_version', StringProperty()),
('valid_from', TimestampProperty(default=lambda: NOW, required=True)),
('valid_until', TimestampProperty()),
('kill_chain_phases', ListProperty(KillChainPhase)),
@ -211,7 +216,7 @@ class Indicator(STIXDomainObject):
])
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
super(Indicator, self)._check_object_constraints()
valid_from = self.get('valid_from')
valid_until = self.get('valid_until')
@ -245,13 +250,14 @@ class Infrastructure(STIXDomainObject):
('name', StringProperty(required=True)),
('description', StringProperty()),
('infrastructure_types', ListProperty(StringProperty, required=True)),
('aliases', ListProperty(StringProperty)),
('kill_chain_phases', ListProperty(KillChainPhase)),
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
])
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
super(Infrastructure, self)._check_object_constraints()
first_seen = self.get('first_seen')
last_seen = self.get('last_seen')
@ -294,7 +300,7 @@ class IntrusionSet(STIXDomainObject):
])
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
super(IntrusionSet, self)._check_object_constraints()
first_seen = self.get('first_seen')
last_seen = self.get('last_seen')
@ -318,6 +324,7 @@ class Location(STIXDomainObject):
('created_by_ref', ReferenceProperty(type='identity', spec_version='2.1')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty()),
('description', StringProperty()),
('latitude', FloatProperty(min=-90.0, max=90.0)),
('longitude', FloatProperty(min=-180.0, max=180.0)),
@ -338,7 +345,7 @@ class Location(STIXDomainObject):
])
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
super(Location, self)._check_object_constraints()
if self.get('precision') is not None:
self._check_properties_dependency(['longitude', 'latitude'], ['precision'])
@ -346,6 +353,20 @@ class Location(STIXDomainObject):
self._check_properties_dependency(['latitude'], ['longitude'])
self._check_properties_dependency(['longitude'], ['latitude'])
if not (
'region' in self
or 'country' in self
or (
'latitude' in self
and 'longitude' in self
)
):
raise PropertyPresenceError(
"Location objects must have the properties 'region', "
"'country', or 'latitude' and 'longitude'",
Location,
)
def to_maps_url(self, map_engine="Google Maps"):
"""Return URL to this location in an online map engine.
@ -411,7 +432,7 @@ class Malware(STIXDomainObject):
('created_by_ref', ReferenceProperty(type='identity', spec_version='2.1')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('name', StringProperty(required=True)),
('name', StringProperty()),
('description', StringProperty()),
('malware_types', ListProperty(StringProperty, required=True)),
('is_family', BooleanProperty(required=True)),
@ -434,7 +455,7 @@ class Malware(STIXDomainObject):
])
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
super(Malware, self)._check_object_constraints()
first_seen = self.get('first_seen')
last_seen = self.get('last_seen')
@ -443,6 +464,12 @@ class Malware(STIXDomainObject):
msg = "{0.id} 'last_seen' must be greater than or equal to 'first_seen'"
raise ValueError(msg.format(self))
if self.is_family and "name" not in self:
raise PropertyPresenceError(
"'name' is a required property for malware families",
Malware,
)
class MalwareAnalysis(STIXDomainObject):
# TODO: Add link
@ -471,7 +498,7 @@ class MalwareAnalysis(STIXDomainObject):
('operating_system_ref', ReferenceProperty(type='software', spec_version='2.1')),
('installed_software_refs', ListProperty(ReferenceProperty(type='software', spec_version='2.1'))),
('configuration_version', StringProperty()),
('module', StringProperty()),
('modules', ListProperty(StringProperty)),
('analysis_engine_version', StringProperty()),
('analysis_definition_version', StringProperty()),
('submitted', TimestampProperty()),
@ -547,10 +574,17 @@ class ObservedData(STIXDomainObject):
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
if "objects" in kwargs:
warnings.warn(
"The 'objects' property of observed-data is deprecated in "
"STIX 2.1.",
STIXDeprecationWarning,
)
super(ObservedData, self).__init__(*args, **kwargs)
def _check_object_constraints(self):
super(self.__class__, self)._check_object_constraints()
super(ObservedData, self)._check_object_constraints()
first_observed = self.get('first_observed')
last_observed = self.get('last_observed')
@ -580,7 +614,6 @@ class Opinion(STIXDomainObject):
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('explanation', StringProperty()),
('authors', ListProperty(StringProperty)),
('object_refs', ListProperty(ReferenceProperty, required=True)),
(
'opinion', EnumProperty(
allowed=[
@ -592,6 +625,7 @@ class Opinion(STIXDomainObject):
], required=True,
),
),
('object_refs', ListProperty(ReferenceProperty, required=True)),
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('confidence', IntegerProperty()),
@ -649,6 +683,8 @@ class ThreatActor(STIXDomainObject):
('description', StringProperty()),
('threat_actor_types', ListProperty(StringProperty, required=True)),
('aliases', ListProperty(StringProperty)),
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
('roles', ListProperty(StringProperty)),
('goals', ListProperty(StringProperty)),
('sophistication', StringProperty()),
@ -665,6 +701,16 @@ class ThreatActor(STIXDomainObject):
('granular_markings', ListProperty(GranularMarking)),
])
def _check_object_constraints(self):
super(ThreatActor, self)._check_object_constraints()
first_observed = self.get('first_seen')
last_observed = self.get('last_seen')
if first_observed and last_observed and last_observed < first_observed:
msg = "{0.id} 'last_seen' must be greater than or equal to 'first_seen'"
raise ValueError(msg.format(self))
class Tool(STIXDomainObject):
# TODO: Add link

View File

@ -80,6 +80,7 @@ class Sighting(STIXRelationshipObject):
('created_by_ref', ReferenceProperty(type='identity', spec_version='2.1')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('description', StringProperty()),
('first_seen', TimestampProperty()),
('last_seen', TimestampProperty()),
('count', IntegerProperty(min=0, max=999999999)),

View File

@ -20,6 +20,7 @@
"""
import functools
import stix2
from . import AttackPattern as _AttackPattern
from . import Campaign as _Campaign
@ -52,6 +53,11 @@ from . import ( # noqa: F401
)
from .datastore.filters import FilterSet
# Enable some adaptation to the current default supported STIX version.
_STIX_VID = "v" + stix2.DEFAULT_VERSION.replace(".", "")
# Use an implicit MemoryStore
_environ = Environment(store=MemoryStore())
@ -116,48 +122,39 @@ def _related_wrapper(self, *args, **kwargs):
return _environ.related_to(self, *args, **kwargs)
def _observed_data_init(self, *args, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
super(self.__class__, self).__init__(*args, **kwargs)
def _constructor_wrapper(obj_type):
# Use an intermediate wrapper class so the implicit environment will create objects that have our wrapper functions
class_dict = dict(
created_by=_created_by_wrapper,
relationships=_relationships_wrapper,
related=_related_wrapper,
**obj_type.__dict__
)
# Avoid TypeError about super() in ObservedData
if 'ObservedData' in obj_type.__name__:
class_dict['__init__'] = _observed_data_init
wrapped_type = type(obj_type.__name__, obj_type.__bases__, class_dict)
@staticmethod
def new_constructor(cls, *args, **kwargs):
x = _environ.create(wrapped_type, *args, **kwargs)
return x
return new_constructor
def _setup_workbench():
# Create wrapper classes whose constructors call the implicit environment's create()
for obj_type in STIX_OBJS:
new_class_dict = {
'__new__': _constructor_wrapper(obj_type),
'__doc__': 'Workbench wrapper around the `{0} <stix2.v20.sdo.rst#stix2.v20.sdo.{0}>`__ object. {1}'.format(obj_type.__name__, STIX_OBJ_DOCS),
}
new_class = type(obj_type.__name__, (), new_class_dict)
# Add our new class to this module's globals and to the library-wide mapping.
# This allows parse() to use the wrapped classes.
globals()[obj_type.__name__] = new_class
stix2.OBJ_MAP[obj_type._type] = new_class
new_class = None
# The idea here was originally to dynamically create subclasses which
# were cleverly customized such that instantiating them would actually
# invoke _environ.create(). This turns out to be impossible, since
# __new__ can never create the class in the normal way, since that
# invokes __new__ again, resulting in infinite recursion. And
# _environ.create() does exactly that.
#
# So instead, we create something "class-like", in that calling it
# produces an instance of the desired class. But these things will
# be functions instead of classes. One might think this trickery will
# have undesirable side-effects, but actually it seems to work.
# So far...
new_class_dict = {
'__doc__': 'Workbench wrapper around the `{0} <stix2.{1}.sdo.rst#stix2.{1}.sdo.{0}>`__ object. {2}'.format(
obj_type.__name__,
_STIX_VID,
STIX_OBJ_DOCS,
),
'created_by': _created_by_wrapper,
'relationships': _relationships_wrapper,
'related': _related_wrapper,
}
new_class = type(obj_type.__name__, (obj_type,), new_class_dict)
factory_func = functools.partial(_environ.create, new_class)
# Add our new "class" to this module's globals and to the library-wide
# mapping. This allows parse() to use the wrapped classes.
globals()[obj_type.__name__] = factory_func
stix2.OBJ_MAP[obj_type._type] = factory_func
_setup_workbench()

View File

@ -11,9 +11,7 @@ deps =
taxii2-client
medallion
commands =
pytest --ignore=stix2/test/v20/test_workbench.py --ignore=stix2/test/v21/test_workbench.py --cov=stix2 stix2/test/ --cov-report term-missing
pytest stix2/test/v20/test_workbench.py --cov=stix2 --cov-report term-missing --cov-append
pytest stix2/test/v21/test_workbench.py --cov=stix2 --cov-report term-missing --cov-append
python -m pytest --cov=stix2 stix2/test/ --cov-report term-missing -W ignore::stix2.exceptions.STIXDeprecationWarning
passenv = CI TRAVIS TRAVIS_*