Merge branch 'master' into datastores

stix2.1
Greg Back 2017-07-19 21:21:08 +00:00
commit bf9677094f
13 changed files with 818 additions and 19 deletions

View File

@ -1,13 +1,12 @@
- repo: https://github.com/pre-commit/pre-commit-hooks
sha: e626cd57090d8df0be21e4df0f4e55cc3511d6ab
sha: ea227f024bd89d638aea319c92806737e3375979
hooks:
- id: trailing-whitespace
- id: flake8
args:
- --max-line-length=160
- id: check-merge-conflict
- repo: https://github.com/FalconSocial/pre-commit-python-sorter
sha: 1.0.4
hooks:
- id: python-import-sorter
- repo: https://github.com/FalconSocial/pre-commit-python-sorter
sha: b57843b0b874df1d16eb0bef00b868792cb245c2
hooks:
- id: python-import-sorter

View File

@ -7,6 +7,8 @@
This repository provides Python APIs for serializing and de-serializing STIX 2 JSON content, along with higher-level APIs for common tasks, including data markings, versioning, and for resolving STIX IDs across multiple data sources.
For more information, see [the documentation](https://stix2.readthedocs.io/en/latest/) on ReadTheDocs.
## Installation
Install with [`pip`](https://pip.pypa.io/en/stable/):

View File

@ -4,6 +4,7 @@
from . import exceptions
from .bundle import Bundle
from .environment import ObjectFactory
from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
AutonomousSystem, CustomObservable, Directory,
DomainName, EmailAddress, EmailMessage,
@ -20,6 +21,23 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE,
ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking)
from .patterns import (AndBooleanExpression, AndObservationExpression,
BasicObjectPathComponent, EqualityComparisonExpression,
FloatConstant, FollowedByObservationExpression,
GreaterThanComparisonExpression,
GreaterThanEqualComparisonExpression, HashConstant,
HexConstant, IntegerConstant,
IsSubsetComparisonExpression,
IsSupersetComparisonExpression,
LessThanComparisonExpression,
LessThanEqualComparisonExpression,
LikeComparisonExpression, ListConstant,
ListObjectPathComponent, MatchesComparisonExpression,
ObjectPath, ObservationExpression, OrBooleanExpression,
OrObservationExpression, ParentheticalExpression,
QualifiedObservationExpression,
ReferenceObjectPathComponent, RepeatQualifier,
StartStopQualifier, StringConstant, WithinQualifier)
from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject,
Identity, Indicator, IntrusionSet, Malware, ObservedData,
Report, ThreatActor, Tool, Vulnerability)

68
stix2/environment.py Normal file
View File

@ -0,0 +1,68 @@
import copy
class ObjectFactory(object):
"""Object Factory
Used to easily create STIX objects with default values for certain
properties.
Args:
created_by_ref: Default created_by_ref value to apply to all
objects created by this factory.
created: Default created value to apply to all
objects created by this factory.
external_references: Default `external_references` value to apply
to all objects created by this factory.
object_marking_refs: Default `object_marking_refs` value to apply
to all objects created by this factory.
list_append: When a default is set for a list property like
`external_references` or `object_marking_refs` and a value for
that property is passed into `create()`, if this is set to True,
that value will be added to the list alongside the default. If
this is set to False, the passed in value will replace the
default. Defaults to True.
"""
def __init__(self, created_by_ref=None, created=None,
external_references=None, object_marking_refs=None,
list_append=True):
self._defaults = {}
if created_by_ref:
self._defaults['created_by_ref'] = created_by_ref
if created:
self._defaults['created'] = created
# If the user provides a default "created" time, we also want to use
# that as the modified time.
self._defaults['modified'] = created
if external_references:
self._defaults['external_references'] = external_references
if object_marking_refs:
self._defaults['object_marking_refs'] = object_marking_refs
self._list_append = list_append
self._list_properties = ['external_references', 'object_marking_refs']
def create(self, cls, **kwargs):
# Use self.defaults as the base, but update with any explicit args
# provided by the user.
properties = copy.deepcopy(self._defaults)
if kwargs:
if self._list_append:
# Append provided items to list properties instead of replacing them
for list_prop in set(self._list_properties).intersection(kwargs.keys(), properties.keys()):
kwarg_prop = kwargs.pop(list_prop)
if kwarg_prop is None:
del properties[list_prop]
continue
if not isinstance(properties[list_prop], list):
properties[list_prop] = [properties[list_prop]]
if isinstance(kwarg_prop, list):
properties[list_prop].extend(kwarg_prop)
else:
properties[list_prop].append(kwarg_prop)
properties.update(**kwargs)
return cls(**properties)

View File

@ -123,7 +123,7 @@ class DomainName(_Observable):
class EmailAddress(_Observable):
_type = 'email-address'
_type = 'email-addr'
_properties = {
'type': TypeProperty(_type),
'value': StringProperty(required=True),
@ -651,7 +651,7 @@ OBJ_MAP_OBSERVABLE = {
'autonomous-system': AutonomousSystem,
'directory': Directory,
'domain-name': DomainName,
'email-address': EmailAddress,
'email-addr': EmailAddress,
'email-message': EmailMessage,
'file': File,
'ipv4-addr': IPv4Address,

View File

@ -69,7 +69,7 @@ class MarkingProperty(Property):
class MarkingDefinition(_STIXBase):
_type = 'marking-definition'
_properties = {
'created': TimestampProperty(default=lambda: NOW, required=True),
'created': TimestampProperty(default=lambda: NOW),
'external_references': ListProperty(ExternalReference),
'created_by_ref': ReferenceProperty(type="identity"),
'object_marking_refs': ListProperty(ReferenceProperty(type="marking-definition")),

419
stix2/patterns.py Normal file
View File

@ -0,0 +1,419 @@
import base64
import binascii
import re
def escape_quotes_and_backslashes(s):
return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'")
class _Constant(object):
pass
class StringConstant(_Constant):
def __init__(self, value):
self.value = value
def __str__(self):
return "'%s'" % escape_quotes_and_backslashes(self.value)
class IntegerConstant(_Constant):
def __init__(self, value):
try:
self.value = int(value)
except Exception:
raise ValueError("must be an integer.")
def __str__(self):
return "%s" % self.value
class FloatConstant(_Constant):
def __init__(self, value):
try:
self.value = float(value)
except Exception:
raise ValueError("must be an float.")
def __str__(self):
return "%s" % self.value
class BooleanConstant(_Constant):
def __init__(self, value):
if isinstance(value, bool):
self.value = value
trues = ['true', 't']
falses = ['false', 'f']
try:
if value.lower() in trues:
self.value = True
if value.lower() in falses:
self.value = False
except AttributeError:
if value == 1:
self.value = True
if value == 0:
self.value = False
raise ValueError("must be a boolean value.")
def __str__(self):
return "%s" % self.value
_HASH_REGEX = {
"MD5": ("^[a-fA-F0-9]{32}$", "MD5"),
"MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"),
"RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"),
"SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"),
"SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"),
"SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"),
"SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"),
"SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"),
"SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"),
"SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"),
"SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"),
"SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"),
"SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"),
"WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"),
}
class HashConstant(StringConstant):
def __init__(self, value, type):
key = type.upper().replace('-', '')
if key in _HASH_REGEX:
vocab_key = _HASH_REGEX[key][1]
if not re.match(_HASH_REGEX[key][0], value):
raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key))
self.value = value
class BinaryConstant(_Constant):
def __init__(self, value):
try:
base64.b64decode(value)
self.value = value
except (binascii.Error, TypeError):
raise ValueError("must contain a base64 encoded string")
def __str__(self):
return "b'%s'" % self.value
class HexConstant(_Constant):
def __init__(self, value):
if not re.match('^([a-fA-F0-9]{2})+$', value):
raise ValueError("must contain an even number of hexadecimal characters")
self.value = value
def __str__(self):
return "h'%s'" % self.value
class ListConstant(_Constant):
def __init__(self, values):
self.value = values
def __str__(self):
return "(" + ", ".join([("%s" % x) for x in self.value]) + ")"
def make_constant(value):
if isinstance(value, str):
return StringConstant(value)
elif isinstance(value, int):
return IntegerConstant(value)
elif isinstance(value, float):
return FloatConstant(value)
elif isinstance(value, list):
return ListConstant(value)
elif isinstance(value, bool):
return BooleanConstant(value)
else:
raise ValueError("Unable to create a constant from %s" % value)
class _ObjectPathComponent(object):
@staticmethod
def create_ObjectPathComponent(component_name):
if component_name.endswith("_ref"):
return ReferenceObjectPathComponent(component_name)
elif component_name.find("[") != -1:
parse1 = component_name.split("[")
return ListObjectPathComponent(parse1[0], parse1[1][:-1])
else:
return BasicObjectPathComponent(component_name)
class BasicObjectPathComponent(_ObjectPathComponent):
def __init__(self, property_name, is_key=False):
self.property_name = property_name
# TODO: set is_key to True if this component is a dictionary key
# self.is_key = is_key
def __str__(self):
return self.property_name
class ListObjectPathComponent(_ObjectPathComponent):
def __init__(self, property_name, index):
self.property_name = property_name
self.index = index
def __str__(self):
return "%s[%s]" % (self.property_name, self.index)
class ReferenceObjectPathComponent(_ObjectPathComponent):
def __init__(self, reference_property_name):
self.property_name = reference_property_name
def __str__(self):
return self.property_name
class ObjectPath(object):
def __init__(self, object_type_name, property_path):
self.object_type_name = object_type_name
self.property_path = [x if isinstance(x, _ObjectPathComponent) else
_ObjectPathComponent.create_ObjectPathComponent(x)
for x in property_path]
def __str__(self):
return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path]))
def merge(self, other):
self.property_path.extend(other.property_path)
return self
@staticmethod
def make_object_path(lhs):
path_as_parts = lhs.split(":")
return ObjectPath(path_as_parts[0], path_as_parts[1].split("."))
class _PatternExpression(object):
@staticmethod
def escape_quotes_and_backslashes(s):
return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'")
class _ComparisonExpression(_PatternExpression):
def __init__(self, operator, lhs, rhs, negated=False):
if operator == "=" and isinstance(rhs, ListConstant):
self.operator = "IN"
else:
self.operator = operator
if isinstance(lhs, ObjectPath):
self.lhs = lhs
else:
self.lhs = ObjectPath.make_object_path(lhs)
if isinstance(rhs, _Constant):
self.rhs = rhs
else:
self.rhs = make_constant(rhs)
self.negated = negated
self.root_type = self.lhs.object_type_name
def __str__(self):
# if isinstance(self.rhs, list):
# final_rhs = []
# for r in self.rhs:
# final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'")
# rhs_string = "(" + ", ".join(final_rhs) + ")"
# else:
# rhs_string = self.rhs
if self.negated:
return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs)
else:
return "%s %s %s" % (self.lhs, self.operator, self.rhs)
class EqualityComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated)
class GreaterThanComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated)
class LessThanComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated)
class GreaterThanEqualComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(GreaterThanEqualComparisonExpression, self).__init__(">=", lhs, rhs, negated)
class LessThanEqualComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(LessThanEqualComparisonExpression, self).__init__("<=", lhs, rhs, negated)
class InComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated)
class LikeComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated)
class MatchesComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated)
class IsSubsetComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated)
class IsSupersetComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated)
class _BooleanExpression(_PatternExpression):
def __init__(self, operator, operands):
self.operator = operator
self.operands = []
for arg in operands:
if not hasattr(self, "root_type"):
self.root_type = arg.root_type
elif self.root_type and (self.root_type != arg.root_type) and operator == "AND":
raise ValueError("All operands to an 'AND' expression must have the same object type")
elif self.root_type and (self.root_type != arg.root_type):
self.root_type = None
self.operands.append(arg)
def __str__(self):
sub_exprs = []
for o in self.operands:
sub_exprs.append("%s" % o)
return (" " + self.operator + " ").join(sub_exprs)
class AndBooleanExpression(_BooleanExpression):
def __init__(self, operands):
super(AndBooleanExpression, self).__init__("AND", operands)
class OrBooleanExpression(_BooleanExpression):
def __init__(self, operands):
super(OrBooleanExpression, self).__init__("OR", operands)
class ObservationExpression(_PatternExpression):
def __init__(self, operand):
self.operand = operand
def __str__(self):
return "[%s]" % self.operand
class _CompoundObservationExpression(_PatternExpression):
def __init__(self, operator, operands):
self.operator = operator
self.operands = operands
def __str__(self):
sub_exprs = []
for o in self.operands:
sub_exprs.append("%s" % o)
return (" " + self.operator + " ").join(sub_exprs)
class AndObservationExpression(_CompoundObservationExpression):
def __init__(self, operands):
super(AndObservationExpression, self).__init__("AND", operands)
class OrObservationExpression(_CompoundObservationExpression):
def __init__(self, operands):
super(OrObservationExpression, self).__init__("OR", operands)
class FollowedByObservationExpression(_CompoundObservationExpression):
def __init__(self, operands):
super(FollowedByObservationExpression, self).__init__("FOLLOWEDBY", operands)
class ParentheticalExpression(_PatternExpression):
def __init__(self, exp):
self.expression = exp
if hasattr(exp, "root_type"):
self.root_type = exp.root_type
def __str__(self):
return "(%s)" % self.expression
class _ExpressionQualifier(_PatternExpression):
pass
class RepeatQualifier(_ExpressionQualifier):
def __init__(self, times_to_repeat):
if isinstance(times_to_repeat, IntegerConstant):
self.times_to_repeat = times_to_repeat
elif isinstance(times_to_repeat, int):
self.times_to_repeat = IntegerConstant(times_to_repeat)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat)
def __str__(self):
return "REPEATS %s TIMES" % self.times_to_repeat
class WithinQualifier(_ExpressionQualifier):
def __init__(self, number_of_seconds):
if isinstance(number_of_seconds, IntegerConstant):
self.number_of_seconds = number_of_seconds
elif isinstance(number_of_seconds, int):
self.number_of_seconds = IntegerConstant(number_of_seconds)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds)
def __str__(self):
return "WITHIN %s SECONDS" % self.number_of_seconds
class StartStopQualifier(_ExpressionQualifier):
def __init__(self, start_time, stop_time):
if isinstance(start_time, IntegerConstant):
self.start_time = start_time
elif isinstance(start_time, int):
self.start_time = IntegerConstant(start_time)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time)
if isinstance(stop_time, IntegerConstant):
self.stop_time = stop_time
elif isinstance(stop_time, int):
self.stop_time = IntegerConstant(stop_time)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time)
def __str__(self):
return "START %s STOP %s" % (self.start_time, self.stop_time)
class QualifiedObservationExpression(_PatternExpression):
def __init__(self, observation_expression, qualifier):
self.observation_expression = observation_expression
self.qualifier = qualifier
def __str__(self):
return "%s %s" % (self.observation_expression, self.qualifier)

View File

@ -5,7 +5,7 @@ import inspect
import re
import uuid
from six import text_type
from six import string_types, text_type
from .base import _STIXBase
from .exceptions import DictionaryKeyError
@ -101,12 +101,9 @@ class ListProperty(Property):
iter(value)
except TypeError:
raise ValueError("must be an iterable.")
try:
if isinstance(value, basestring):
value = [value]
except NameError:
if isinstance(value, str):
value = [value]
if isinstance(value, (_STIXBase, string_types)):
value = [value]
result = []
for item in value:

View File

@ -20,6 +20,12 @@ TOOL_ID = "tool--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f"
SIGHTING_ID = "sighting--bfbc19db-ec35-4e45-beed-f8bde2a772fb"
VULNERABILITY_ID = "vulnerability--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061"
# Minimum required args for an Identity instance
IDENTITY_KWARGS = dict(
name="John Smith",
identity_class="individual",
)
# Minimum required args for an Indicator instance
INDICATOR_KWARGS = dict(
labels=['malicious-activity'],

View File

@ -0,0 +1,83 @@
import stix2
from .constants import (FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS,
INDICATOR_KWARGS)
def test_object_factory_created_by_ref_str():
factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert ind.created_by_ref == IDENTITY_ID
def test_object_factory_created_by_ref_obj():
id_obj = stix2.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
factory = stix2.ObjectFactory(created_by_ref=id_obj)
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert ind.created_by_ref == IDENTITY_ID
def test_object_factory_override_default():
factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
new_id = "identity--983b3172-44fe-4a80-8091-eb8098841fe8"
ind = factory.create(stix2.Indicator, created_by_ref=new_id, **INDICATOR_KWARGS)
assert ind.created_by_ref == new_id
def test_object_factory_created():
factory = stix2.ObjectFactory(created=FAKE_TIME)
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert ind.created == FAKE_TIME
assert ind.modified == FAKE_TIME
def test_object_factory_external_resource():
ext_ref = stix2.ExternalReference(source_name="ACME Threat Intel",
description="Threat report")
factory = stix2.ObjectFactory(external_references=ext_ref)
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert ind.external_references[0].source_name == "ACME Threat Intel"
assert ind.external_references[0].description == "Threat report"
ind2 = factory.create(stix2.Indicator, external_references=None, **INDICATOR_KWARGS)
assert 'external_references' not in ind2
def test_object_factory_obj_markings():
stmt_marking = stix2.StatementMarking("Copyright 2016, Example Corp")
mark_def = stix2.MarkingDefinition(definition_type="statement",
definition=stmt_marking)
factory = stix2.ObjectFactory(object_marking_refs=[mark_def, stix2.TLP_AMBER])
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert mark_def.id in ind.object_marking_refs
assert stix2.TLP_AMBER.id in ind.object_marking_refs
factory = stix2.ObjectFactory(object_marking_refs=stix2.TLP_RED)
ind = factory.create(stix2.Indicator, **INDICATOR_KWARGS)
assert stix2.TLP_RED.id in ind.object_marking_refs
def test_object_factory_list_append():
ext_ref = stix2.ExternalReference(source_name="ACME Threat Intel",
description="Threat report from ACME")
ext_ref2 = stix2.ExternalReference(source_name="Yet Another Threat Report",
description="Threat report from YATR")
ext_ref3 = stix2.ExternalReference(source_name="Threat Report #3",
description="One more threat report")
factory = stix2.ObjectFactory(external_references=ext_ref)
ind = factory.create(stix2.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS)
assert ind.external_references[1].source_name == "Yet Another Threat Report"
ind = factory.create(stix2.Indicator, external_references=[ext_ref2, ext_ref3], **INDICATOR_KWARGS)
assert ind.external_references[2].source_name == "Threat Report #3"
def test_object_factory_list_replace():
ext_ref = stix2.ExternalReference(source_name="ACME Threat Intel",
description="Threat report from ACME")
ext_ref2 = stix2.ExternalReference(source_name="Yet Another Threat Report",
description="Threat report from YATR")
factory = stix2.ObjectFactory(external_references=ext_ref, list_append=False)
ind = factory.create(stix2.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS)
assert len(ind.external_references) == 1
assert ind.external_references[0].source_name == "Yet Another Threat Report"

View File

@ -29,6 +29,19 @@ EXPECTED_STATEMENT_MARKING_DEFINITION = """{
"type": "marking-definition"
}"""
EXPECTED_CAMPAIGN_WITH_OBJECT_MARKING = """{
"created": "2016-04-06T20:03:00.000Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"description": "Campaign by Green Group against a series of targets in the financial services sector.",
"id": "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
"modified": "2016-04-06T20:03:00.000Z",
"name": "Green Group Attacks Against Finance",
"object_marking_refs": [
"marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"
],
"type": "campaign"
}"""
EXPECTED_GRANULAR_MARKING = """{
"marking_ref": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
"selectors": [
@ -84,6 +97,29 @@ def test_marking_def_example_with_positional_statement():
assert str(marking_definition) == EXPECTED_STATEMENT_MARKING_DEFINITION
def test_marking_def_invalid_type():
with pytest.raises(ValueError):
stix2.MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
definition_type="my-definiition-type",
definition=stix2.StatementMarking("Copyright 2016, Example Corp")
)
def test_campaign_with_markings_example():
campaign = stix2.Campaign(
id="campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T20:03:00Z",
modified="2016-04-06T20:03:00Z",
name="Green Group Attacks Against Finance",
description="Campaign by Green Group against a series of targets in the financial services sector.",
object_marking_refs=TLP_WHITE
)
assert str(campaign) == EXPECTED_CAMPAIGN_WITH_OBJECT_MARKING
def test_granular_example():
granular_marking = stix2.GranularMarking(
marking_ref="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
@ -119,7 +155,6 @@ def test_campaign_with_granular_markings_example():
marking_ref="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
selectors=["description"])
])
print(str(campaign))
assert str(campaign) == EXPECTED_CAMPAIGN_WITH_GRANULAR_MARKINGS

View File

@ -225,7 +225,7 @@ def test_parse_autonomous_system_valid(data):
@pytest.mark.parametrize("data", [
"""{
"type": "email-address",
"type": "email-addr",
"value": "john@example.com",
"display_name": "John Doe",
"belongs_to_ref": "0"
@ -233,7 +233,7 @@ def test_parse_autonomous_system_valid(data):
])
def test_parse_email_address(data):
odata = stix2.parse_observable(data, {"0": "user-account"})
assert odata.type == "email-address"
assert odata.type == "email-addr"
odata_str = re.compile('"belongs_to_ref": "0"', re.DOTALL).sub('"belongs_to_ref": "3"', data)
with pytest.raises(stix2.exceptions.InvalidObjRefError):

View File

@ -0,0 +1,172 @@
import stix2
def test_create_comparison_expression():
exp = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
stix2.HashConstant("aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", "SHA-256")) # noqa
assert str(exp) == "file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f'"
def test_boolean_expression():
exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value",
stix2.StringConstant(".+\\@example\\.com$"))
exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name",
stix2.StringConstant("^Final Report.+\\.exe$"))
exp = stix2.AndBooleanExpression([exp1, exp2])
assert str(exp) == "email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$'" # noqa
def test_boolean_expression_with_parentheses():
exp1 = stix2.MatchesComparisonExpression(stix2.ObjectPath("email-message",
[stix2.ReferenceObjectPathComponent("from_ref"),
stix2.BasicObjectPathComponent("value")]),
stix2.StringConstant(".+\\@example\\.com$"))
exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name",
stix2.StringConstant("^Final Report.+\\.exe$"))
exp = stix2.ParentheticalExpression(stix2.AndBooleanExpression([exp1, exp2]))
assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')" # noqa
def test_hash_followed_by_registryKey_expression_python_constant():
hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5",
stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5"))
o_exp1 = stix2.ObservationExpression(hash_exp)
reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]),
stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar"))
o_exp2 = stix2.ObservationExpression(reg_exp)
fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2])
para_exp = stix2.ParentheticalExpression(fb_exp)
qual_exp = stix2.WithinQualifier(300)
exp = stix2.QualifiedObservationExpression(para_exp, qual_exp)
assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa
def test_hash_followed_by_registryKey_expression():
hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5",
stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5"))
o_exp1 = stix2.ObservationExpression(hash_exp)
reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]),
stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar"))
o_exp2 = stix2.ObservationExpression(reg_exp)
fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2])
para_exp = stix2.ParentheticalExpression(fb_exp)
qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300))
exp = stix2.QualifiedObservationExpression(para_exp, qual_exp)
assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa
def test_file_observable_expression():
exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
stix2.HashConstant(
"aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f",
'SHA-256'))
exp2 = stix2.EqualityComparisonExpression("file:mime_type", stix2.StringConstant("application/x-pdf"))
bool_exp = stix2.AndBooleanExpression([exp1, exp2])
exp = stix2.ObservationExpression(bool_exp)
assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa
def test_multiple_file_observable_expression():
exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
stix2.HashConstant(
"bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c",
'SHA-256'))
exp2 = stix2.EqualityComparisonExpression("file:hashes.MD5",
stix2.HashConstant("cead3f77f6cda6ec00f57d76c9a6879f", "MD5"))
bool1_exp = stix2.OrBooleanExpression([exp1, exp2])
exp3 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
stix2.HashConstant(
"aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f",
'SHA-256'))
op1_exp = stix2.ObservationExpression(bool1_exp)
op2_exp = stix2.ObservationExpression(exp3)
exp = stix2.AndObservationExpression([op1_exp, op2_exp])
assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa
def test_root_types():
ast = stix2.ObservationExpression(
stix2.AndBooleanExpression(
[stix2.ParentheticalExpression(
stix2.OrBooleanExpression([
stix2.EqualityComparisonExpression("a:b", stix2.StringConstant("1")),
stix2.EqualityComparisonExpression("b:c", stix2.StringConstant("2"))])),
stix2.EqualityComparisonExpression(u"b:d", stix2.StringConstant("3"))]))
assert str(ast) == "[(a:b = '1' OR b:c = '2') AND b:d = '3']"
def test_artifact_payload():
exp1 = stix2.EqualityComparisonExpression("artifact:mime_type",
"application/vnd.tcpdump.pcap")
exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin",
stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00"))
and_exp = stix2.AndBooleanExpression([exp1, exp2])
exp = stix2.ObservationExpression(and_exp)
assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa
def test_greater_than_python_constant():
exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy",
7.0)
exp = stix2.ObservationExpression(exp1)
assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]"
def test_greater_than():
exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy",
stix2.FloatConstant(7.0))
exp = stix2.ObservationExpression(exp1)
assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]"
def test_and_observable_expression():
exp1 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type",
"unix"),
stix2.EqualityComparisonExpression("user-account:user_id",
stix2.StringConstant("1007")),
stix2.EqualityComparisonExpression("user-account:account_login",
"Peter")])
exp2 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type",
"unix"),
stix2.EqualityComparisonExpression("user-account:user_id",
stix2.StringConstant("1008")),
stix2.EqualityComparisonExpression("user-account:account_login",
"Paul")])
exp3 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type",
"unix"),
stix2.EqualityComparisonExpression("user-account:user_id",
stix2.StringConstant("1009")),
stix2.EqualityComparisonExpression("user-account:account_login",
"Mary")])
exp = stix2.AndObservationExpression([stix2.ObservationExpression(exp1),
stix2.ObservationExpression(exp2),
stix2.ObservationExpression(exp3)])
assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']" # noqa
def test_hex():
exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("file:mime_type",
"image/bmp"),
stix2.EqualityComparisonExpression("file:magic_number_hex",
stix2.HexConstant("ffd8"))])
exp = stix2.ObservationExpression(exp_and)
assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']"
def test_multiple_qualifiers():
exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("network-traffic:dst_ref.type",
"domain-name"),
stix2.EqualityComparisonExpression("network-traffic:dst_ref.value",
"example.com")])
exp_ob = stix2.ObservationExpression(exp_and)
qual_rep = stix2.RepeatQualifier(5)
qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800))
exp = stix2.QualifiedObservationExpression(stix2.QualifiedObservationExpression(exp_ob, qual_rep), qual_within)
assert str(exp) == "[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS" # noqa
def test_set_op():
exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value",
"2001:0db8:dead:beef:0000:0000:0000:0000/64"))
assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']"