Merge pull request #229 from oasis-open/add_visitor2

Add an AST for STIX pattern navigation. From add_visitor2
master
Emmanuelle Vargas-Gonzalez 2018-12-11 09:00:34 -05:00 committed by GitHub
commit 605842001f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 709 additions and 162 deletions

28
.gitignore vendored
View File

@ -68,3 +68,31 @@ cache.sqlite
# PyCharm
.idea/
### macOS template
# General
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk

View File

@ -2,6 +2,7 @@
skip = workbench.py
not_skip = __init__.py
known_third_party =
antlr4,
dateutil,
medallion,
pytest,

349
stix2/pattern_visitor.py Normal file
View File

@ -0,0 +1,349 @@
import importlib
import inspect
from antlr4 import CommonTokenStream, InputStream
import six
from stix2patterns.grammars.STIXPatternLexer import STIXPatternLexer
from stix2patterns.grammars.STIXPatternParser import (STIXPatternParser,
TerminalNode)
from stix2patterns.grammars.STIXPatternVisitor import STIXPatternVisitor
from stix2patterns.validator import STIXPatternErrorListener
from .patterns import *
from .patterns import _BooleanExpression
# flake8: noqa F405
def collapse_lists(lists):
result = []
for c in lists:
if isinstance(c, list):
result.extend(c)
else:
result.append(c)
return result
def remove_terminal_nodes(parse_tree_nodes):
values = []
for x in parse_tree_nodes:
if not isinstance(x, TerminalNode):
values.append(x)
return values
# This class defines a complete generic visitor for a parse tree produced by STIXPatternParser.
class STIXPatternVisitorForSTIX2(STIXPatternVisitor):
classes = {}
def __init__(self, module_suffix, module_name):
if module_suffix and module_name:
self.module_suffix = module_suffix
if not STIXPatternVisitorForSTIX2.classes:
module = importlib.import_module(module_name)
for k, c in inspect.getmembers(module, inspect.isclass):
STIXPatternVisitorForSTIX2.classes[k] = c
else:
self.module_suffix = None
super(STIXPatternVisitor, self).__init__()
def get_class(self, class_name):
if class_name in STIXPatternVisitorForSTIX2.classes:
return STIXPatternVisitorForSTIX2.classes[class_name]
else:
return None
def instantiate(self, klass_name, *args):
klass_to_instantiate = None
if self.module_suffix:
klass_to_instantiate = self.get_class(klass_name + "For" + self.module_suffix)
if not klass_to_instantiate:
# use the classes in python_stix2
klass_to_instantiate = globals()[klass_name]
return klass_to_instantiate(*args)
# Visit a parse tree produced by STIXPatternParser#pattern.
def visitPattern(self, ctx):
children = self.visitChildren(ctx)
return children[0]
# Visit a parse tree produced by STIXPatternParser#observationExpressions.
def visitObservationExpressions(self, ctx):
children = self.visitChildren(ctx)
if len(children) == 1:
return children[0]
else:
return FollowedByObservationExpression([children[0], children[2]])
# Visit a parse tree produced by STIXPatternParser#observationExpressionOr.
def visitObservationExpressionOr(self, ctx):
children = self.visitChildren(ctx)
if len(children) == 1:
return children[0]
else:
return self.instantiate("OrObservationExpression", [children[0], children[2]])
# Visit a parse tree produced by STIXPatternParser#observationExpressionAnd.
def visitObservationExpressionAnd(self, ctx):
children = self.visitChildren(ctx)
if len(children) == 1:
return children[0]
else:
return self.instantiate("AndObservationExpression", [children[0], children[2]])
# Visit a parse tree produced by STIXPatternParser#observationExpressionRepeated.
def visitObservationExpressionRepeated(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("QualifiedObservationExpression", children[0], children[1])
# Visit a parse tree produced by STIXPatternParser#observationExpressionSimple.
def visitObservationExpressionSimple(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("ObservationExpression", children[1])
# Visit a parse tree produced by STIXPatternParser#observationExpressionCompound.
def visitObservationExpressionCompound(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("ObservationExpression", children[1])
# Visit a parse tree produced by STIXPatternParser#observationExpressionWithin.
def visitObservationExpressionWithin(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("QualifiedObservationExpression", children[0], children[1])
# Visit a parse tree produced by STIXPatternParser#observationExpressionStartStop.
def visitObservationExpressionStartStop(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("QualifiedObservationExpression", children[0], children[1])
# Visit a parse tree produced by STIXPatternParser#comparisonExpression.
def visitComparisonExpression(self, ctx):
children = self.visitChildren(ctx)
if len(children) == 1:
return children[0]
else:
if isinstance(children[0], _BooleanExpression):
children[0].operands.append(children[2])
return children[0]
else:
return self.instantiate("OrBooleanExpression", [children[0], children[2]])
# Visit a parse tree produced by STIXPatternParser#comparisonExpressionAnd.
def visitComparisonExpressionAnd(self, ctx):
# TODO: NOT
children = self.visitChildren(ctx)
if len(children) == 1:
return children[0]
else:
if isinstance(children[0], _BooleanExpression):
children[0].operands.append(children[2])
return children[0]
else:
return self.instantiate("AndBooleanExpression", [children[0], children[2]])
# Visit a parse tree produced by STIXPatternParser#propTestEqual.
def visitPropTestEqual(self, ctx):
children = self.visitChildren(ctx)
operator = children[1].symbol.type
negated = operator != STIXPatternParser.EQ
return self.instantiate("EqualityComparisonExpression", children[0], children[3 if len(children) > 3 else 2],
negated)
# Visit a parse tree produced by STIXPatternParser#propTestOrder.
def visitPropTestOrder(self, ctx):
children = self.visitChildren(ctx)
operator = children[1].symbol.type
if operator == STIXPatternParser.GT:
return self.instantiate("GreaterThanComparisonExpression", children[0],
children[3 if len(children) > 3 else 2], False)
elif operator == STIXPatternParser.LT:
return self.instantiate("LessThanComparisonExpression", children[0],
children[3 if len(children) > 3 else 2], False)
elif operator == STIXPatternParser.GE:
return self.instantiate("GreaterThanEqualComparisonExpression", children[0],
children[3 if len(children) > 3 else 2], False)
elif operator == STIXPatternParser.LE:
return self.instantiate("LessThanEqualComparisonExpression", children[0],
children[3 if len(children) > 3 else 2], False)
# Visit a parse tree produced by STIXPatternParser#propTestSet.
def visitPropTestSet(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("InComparisonExpression", children[0], children[3 if len(children) > 3 else 2], False)
# Visit a parse tree produced by STIXPatternParser#propTestLike.
def visitPropTestLike(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("LikeComparisonExpression", children[0], children[3 if len(children) > 3 else 2], False)
# Visit a parse tree produced by STIXPatternParser#propTestRegex.
def visitPropTestRegex(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("MatchesComparisonExpression", children[0], children[3 if len(children) > 3 else 2],
False)
# Visit a parse tree produced by STIXPatternParser#propTestIsSubset.
def visitPropTestIsSubset(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("IsSubsetComparisonExpression", children[0], children[3 if len(children) > 3 else 2])
# Visit a parse tree produced by STIXPatternParser#propTestIsSuperset.
def visitPropTestIsSuperset(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("IsSupersetComparisonExpression", children[0], children[3 if len(children) > 3 else 2])
# Visit a parse tree produced by STIXPatternParser#propTestParen.
def visitPropTestParen(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("ParentheticalExpression", children[1])
# Visit a parse tree produced by STIXPatternParser#startStopQualifier.
def visitStartStopQualifier(self, ctx):
children = self.visitChildren(ctx)
return StartStopQualifier(children[1], children[3])
# Visit a parse tree produced by STIXPatternParser#withinQualifier.
def visitWithinQualifier(self, ctx):
children = self.visitChildren(ctx)
return WithinQualifier(children[1])
# Visit a parse tree produced by STIXPatternParser#repeatedQualifier.
def visitRepeatedQualifier(self, ctx):
children = self.visitChildren(ctx)
return RepeatQualifier(children[1])
# Visit a parse tree produced by STIXPatternParser#objectPath.
def visitObjectPath(self, ctx):
children = self.visitChildren(ctx)
flat_list = collapse_lists(children[2:])
property_path = []
i = 0
while i < len(flat_list):
current = flat_list[i]
if i == len(flat_list)-1:
property_path.append(current)
break
next = flat_list[i+1]
if isinstance(next, TerminalNode):
property_path.append(self.instantiate("ListObjectPathComponent", current.property_name, next.getText()))
i += 2
else:
property_path.append(current)
i += 1
return self.instantiate("ObjectPath", children[0].getText(), property_path)
# Visit a parse tree produced by STIXPatternParser#objectType.
def visitObjectType(self, ctx):
children = self.visitChildren(ctx)
return children[0]
# Visit a parse tree produced by STIXPatternParser#firstPathComponent.
def visitFirstPathComponent(self, ctx):
children = self.visitChildren(ctx)
step = children[0].getText()
# if step.endswith("_ref"):
# return stix2.ReferenceObjectPathComponent(step)
# else:
return self.instantiate("BasicObjectPathComponent", step, False)
# Visit a parse tree produced by STIXPatternParser#indexPathStep.
def visitIndexPathStep(self, ctx):
children = self.visitChildren(ctx)
return children[1]
# Visit a parse tree produced by STIXPatternParser#pathStep.
def visitPathStep(self, ctx):
return collapse_lists(self.visitChildren(ctx))
# Visit a parse tree produced by STIXPatternParser#keyPathStep.
def visitKeyPathStep(self, ctx):
children = self.visitChildren(ctx)
if isinstance(children[1], StringConstant):
# special case for hashes
return children[1].value
else:
return self.instantiate("BasicObjectPathComponent", children[1].getText(), True)
# Visit a parse tree produced by STIXPatternParser#setLiteral.
def visitSetLiteral(self, ctx):
children = self.visitChildren(ctx)
return self.instantiate("ListConstant", remove_terminal_nodes(children))
# Visit a parse tree produced by STIXPatternParser#primitiveLiteral.
def visitPrimitiveLiteral(self, ctx):
children = self.visitChildren(ctx)
return children[0]
# Visit a parse tree produced by STIXPatternParser#orderableLiteral.
def visitOrderableLiteral(self, ctx):
children = self.visitChildren(ctx)
return children[0]
def visitTerminal(self, node):
if node.symbol.type == STIXPatternParser.IntPosLiteral or node.symbol.type == STIXPatternParser.IntNegLiteral:
return IntegerConstant(node.getText())
elif node.symbol.type == STIXPatternParser.FloatPosLiteral or node.symbol.type == STIXPatternParser.FloatNegLiteral:
return FloatConstant(node.getText())
elif node.symbol.type == STIXPatternParser.HexLiteral:
return HexConstant(node.getText(), from_parse_tree=True)
elif node.symbol.type == STIXPatternParser.BinaryLiteral:
return BinaryConstant(node.getText(), from_parse_tree=True)
elif node.symbol.type == STIXPatternParser.StringLiteral:
return StringConstant(node.getText().strip('\''), from_parse_tree=True)
elif node.symbol.type == STIXPatternParser.BoolLiteral:
return BooleanConstant(node.getText())
elif node.symbol.type == STIXPatternParser.TimestampLiteral:
return TimestampConstant(node.getText())
else:
return node
def aggregateResult(self, aggregate, nextResult):
if aggregate:
aggregate.append(nextResult)
elif nextResult:
aggregate = [nextResult]
return aggregate
def create_pattern_object(pattern, module_suffix="", module_name=""):
"""
Validates a pattern against the STIX Pattern grammar. Error messages are
returned in a list. The test passed if the returned list is empty.
"""
start = ''
if isinstance(pattern, six.string_types):
start = pattern[:2]
pattern = InputStream(pattern)
if not start:
start = pattern.readline()[:2]
pattern.seek(0)
parseErrListener = STIXPatternErrorListener()
lexer = STIXPatternLexer(pattern)
# it always adds a console listener by default... remove it.
lexer.removeErrorListeners()
stream = CommonTokenStream(lexer)
parser = STIXPatternParser(stream)
parser.buildParseTrees = True
# it always adds a console listener by default... remove it.
parser.removeErrorListeners()
parser.addErrorListener(parseErrListener)
# To improve error messages, replace "<INVALID>" in the literal
# names with symbolic names. This is a hack, but seemed like
# the simplest workaround.
for i, lit_name in enumerate(parser.literalNames):
if lit_name == u"<INVALID>":
parser.literalNames[i] = parser.symbolicNames[i]
tree = parser.pattern()
builder = STIXPatternVisitorForSTIX2(module_suffix, module_name)
return builder.visit(tree)

View File

@ -6,6 +6,8 @@ import binascii
import datetime
import re
import six
from .utils import parse_into_datetime
@ -13,6 +15,14 @@ def escape_quotes_and_backslashes(s):
return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'")
def quote_if_needed(x):
if isinstance(x, six.string_types):
if x.find("-") != -1:
if not x.startswith("'"):
return "'" + x + "'"
return x
class _Constant(object):
pass
@ -23,11 +33,13 @@ class StringConstant(_Constant):
Args:
value (str): string value
"""
def __init__(self, value):
def __init__(self, value, from_parse_tree=False):
self.needs_to_be_quoted = not from_parse_tree
self.value = value
def __str__(self):
return "'%s'" % escape_quotes_and_backslashes(self.value)
return "'%s'" % (escape_quotes_and_backslashes(self.value) if self.needs_to_be_quoted else self.value)
class TimestampConstant(_Constant):
@ -86,8 +98,8 @@ class BooleanConstant(_Constant):
self.value = value
return
trues = ['true', 't']
falses = ['false', 'f']
trues = ['true', 't', '1']
falses = ['false', 'f', '0']
try:
if value.lower() in trues:
self.value = True
@ -143,7 +155,7 @@ class HashConstant(StringConstant):
vocab_key = _HASH_REGEX[key][1]
if not re.match(_HASH_REGEX[key][0], value):
raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key))
self.value = value
super(HashConstant, self).__init__(value)
class BinaryConstant(_Constant):
@ -152,7 +164,13 @@ class BinaryConstant(_Constant):
Args:
value (str): base64 encoded string value
"""
def __init__(self, value):
def __init__(self, value, from_parse_tree=False):
# support with or without a 'b'
if from_parse_tree:
m = re.match("^b'(.+)'$", value)
if m:
value = m.group(1)
try:
base64.b64decode(value)
self.value = value
@ -169,10 +187,17 @@ class HexConstant(_Constant):
Args:
value (str): hexadecimal value
"""
def __init__(self, value):
if not re.match('^([a-fA-F0-9]{2})+$', value):
raise ValueError("must contain an even number of hexadecimal characters")
self.value = value
def __init__(self, value, from_parse_tree=False):
# support with or without an 'h'
if not from_parse_tree and re.match('^([a-fA-F0-9]{2})+$', value):
self.value = value
else:
m = re.match("^h'(([a-fA-F0-9]{2})+)'$", value)
if m:
self.value = m.group(1)
else:
raise ValueError("must contain an even number of hexadecimal characters")
def __str__(self):
return "h'%s'" % self.value
@ -185,10 +210,11 @@ class ListConstant(_Constant):
value (list): list of values
"""
def __init__(self, values):
self.value = values
# handle _Constants or make a _Constant
self.value = [x if isinstance(x, _Constant) else make_constant(x) for x in values]
def __str__(self):
return "(" + ", ".join([("%s" % make_constant(x)) for x in self.value]) + ")"
return "(" + ", ".join(["%s" % x for x in self.value]) + ")"
def make_constant(value):
@ -229,7 +255,10 @@ class _ObjectPathComponent(object):
parse1 = component_name.split("[")
return ListObjectPathComponent(parse1[0], parse1[1][:-1])
else:
return BasicObjectPathComponent(component_name)
return BasicObjectPathComponent(component_name, False)
def __str__(self):
return quote_if_needed(self.property_name)
class BasicObjectPathComponent(_ObjectPathComponent):
@ -243,14 +272,11 @@ class BasicObjectPathComponent(_ObjectPathComponent):
property_name (str): object property name
is_key (bool): is dictionary key, default: False
"""
def __init__(self, property_name, is_key=False):
def __init__(self, property_name, is_key):
self.property_name = property_name
# TODO: set is_key to True if this component is a dictionary key
# self.is_key = is_key
def __str__(self):
return self.property_name
class ListObjectPathComponent(_ObjectPathComponent):
"""List object path component (for an observation or expression)
@ -264,7 +290,7 @@ class ListObjectPathComponent(_ObjectPathComponent):
self.index = index
def __str__(self):
return "%s[%s]" % (self.property_name, self.index)
return "%s[%s]" % (quote_if_needed(self.property_name), self.index)
class ReferenceObjectPathComponent(_ObjectPathComponent):
@ -276,9 +302,6 @@ class ReferenceObjectPathComponent(_ObjectPathComponent):
def __init__(self, reference_property_name):
self.property_name = reference_property_name
def __str__(self):
return self.property_name
class ObjectPath(object):
"""Pattern operand object (property) path
@ -289,12 +312,14 @@ class ObjectPath(object):
"""
def __init__(self, object_type_name, property_path):
self.object_type_name = object_type_name
self.property_path = [x if isinstance(x, _ObjectPathComponent) else
_ObjectPathComponent.create_ObjectPathComponent(x)
for x in property_path]
self.property_path = [
x if isinstance(x, _ObjectPathComponent) else
_ObjectPathComponent.create_ObjectPathComponent(x)
for x in property_path
]
def __str__(self):
return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path]))
return "%s:%s" % (self.object_type_name, ".".join(["%s" % quote_if_needed(x) for x in self.property_path]))
def merge(self, other):
"""Extend the object property with that of the supplied object property path"""

View File

@ -3,43 +3,61 @@ import datetime
import pytest
import stix2
from stix2.pattern_visitor import create_pattern_object
def test_create_comparison_expression():
exp = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
stix2.HashConstant("aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", "SHA-256")) # noqa
exp = stix2.EqualityComparisonExpression(
"file:hashes.'SHA-256'",
stix2.HashConstant("aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", "SHA-256"),
) # noqa
assert str(exp) == "file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f'"
def test_boolean_expression():
exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value",
stix2.StringConstant(".+\\@example\\.com$"))
exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name",
stix2.StringConstant("^Final Report.+\\.exe$"))
exp1 = stix2.MatchesComparisonExpression(
"email-message:from_ref.value",
stix2.StringConstant(".+\\@example\\.com$"),
)
exp2 = stix2.MatchesComparisonExpression(
"email-message:body_multipart[*].body_raw_ref.name",
stix2.StringConstant("^Final Report.+\\.exe$"),
)
exp = stix2.AndBooleanExpression([exp1, exp2])
assert str(exp) == "email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$'" # noqa
def test_boolean_expression_with_parentheses():
exp1 = stix2.MatchesComparisonExpression(stix2.ObjectPath("email-message",
[stix2.ReferenceObjectPathComponent("from_ref"),
stix2.BasicObjectPathComponent("value")]),
stix2.StringConstant(".+\\@example\\.com$"))
exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name",
stix2.StringConstant("^Final Report.+\\.exe$"))
exp1 = stix2.MatchesComparisonExpression(
stix2.ObjectPath(
"email-message",
[
stix2.ReferenceObjectPathComponent("from_ref"),
stix2.BasicObjectPathComponent("value", False),
],
),
stix2.StringConstant(".+\\@example\\.com$"),
)
exp2 = stix2.MatchesComparisonExpression(
"email-message:body_multipart[*].body_raw_ref.name",
stix2.StringConstant("^Final Report.+\\.exe$"),
)
exp = stix2.ParentheticalExpression(stix2.AndBooleanExpression([exp1, exp2]))
assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')" # noqa
def test_hash_followed_by_registryKey_expression_python_constant():
hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5",
stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5"))
hash_exp = stix2.EqualityComparisonExpression(
"file:hashes.MD5",
stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5"),
)
o_exp1 = stix2.ObservationExpression(hash_exp)
reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]),
stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar"))
reg_exp = stix2.EqualityComparisonExpression(
stix2.ObjectPath("windows-registry-key", ["key"]),
stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar"),
)
o_exp2 = stix2.ObservationExpression(reg_exp)
fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2])
para_exp = stix2.ParentheticalExpression(fb_exp)
@ -49,11 +67,15 @@ def test_hash_followed_by_registryKey_expression_python_constant():
def test_hash_followed_by_registryKey_expression():
hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5",
stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5"))
hash_exp = stix2.EqualityComparisonExpression(
"file:hashes.MD5",
stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5"),
)
o_exp1 = stix2.ObservationExpression(hash_exp)
reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]),
stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar"))
reg_exp = stix2.EqualityComparisonExpression(
stix2.ObjectPath("windows-registry-key", ["key"]),
stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar"),
)
o_exp2 = stix2.ObservationExpression(reg_exp)
fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2])
para_exp = stix2.ParentheticalExpression(fb_exp)
@ -63,31 +85,44 @@ def test_hash_followed_by_registryKey_expression():
def test_file_observable_expression():
exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
stix2.HashConstant(
"aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f",
'SHA-256'))
exp1 = stix2.EqualityComparisonExpression(
"file:hashes.'SHA-256'",
stix2.HashConstant(
"aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f",
'SHA-256',
),
)
exp2 = stix2.EqualityComparisonExpression("file:mime_type", stix2.StringConstant("application/x-pdf"))
bool_exp = stix2.ObservationExpression(stix2.AndBooleanExpression([exp1, exp2]))
assert str(bool_exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa
@pytest.mark.parametrize("observation_class, op", [
(stix2.AndObservationExpression, 'AND'),
(stix2.OrObservationExpression, 'OR'),
])
@pytest.mark.parametrize(
"observation_class, op", [
(stix2.AndObservationExpression, 'AND'),
(stix2.OrObservationExpression, 'OR'),
],
)
def test_multiple_file_observable_expression(observation_class, op):
exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
stix2.HashConstant(
"bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c",
'SHA-256'))
exp2 = stix2.EqualityComparisonExpression("file:hashes.MD5",
stix2.HashConstant("cead3f77f6cda6ec00f57d76c9a6879f", "MD5"))
exp1 = stix2.EqualityComparisonExpression(
"file:hashes.'SHA-256'",
stix2.HashConstant(
"bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c",
'SHA-256',
),
)
exp2 = stix2.EqualityComparisonExpression(
"file:hashes.MD5",
stix2.HashConstant("cead3f77f6cda6ec00f57d76c9a6879f", "MD5"),
)
bool1_exp = stix2.OrBooleanExpression([exp1, exp2])
exp3 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
stix2.HashConstant(
"aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f",
'SHA-256'))
exp3 = stix2.EqualityComparisonExpression(
"file:hashes.'SHA-256'",
stix2.HashConstant(
"aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f",
'SHA-256',
),
)
op1_exp = stix2.ObservationExpression(bool1_exp)
op2_exp = stix2.ObservationExpression(exp3)
exp = observation_class([op1_exp, op2_exp])
@ -97,34 +132,46 @@ def test_multiple_file_observable_expression(observation_class, op):
def test_root_types():
ast = stix2.ObservationExpression(
stix2.AndBooleanExpression(
[stix2.ParentheticalExpression(
stix2.OrBooleanExpression([
stix2.EqualityComparisonExpression("a:b", stix2.StringConstant("1")),
stix2.EqualityComparisonExpression("b:c", stix2.StringConstant("2"))])),
stix2.EqualityComparisonExpression(u"b:d", stix2.StringConstant("3"))]))
[
stix2.ParentheticalExpression(
stix2.OrBooleanExpression([
stix2.EqualityComparisonExpression("a:b", stix2.StringConstant("1")),
stix2.EqualityComparisonExpression("b:c", stix2.StringConstant("2")),
]),
),
stix2.EqualityComparisonExpression(u"b:d", stix2.StringConstant("3")),
],
),
)
assert str(ast) == "[(a:b = '1' OR b:c = '2') AND b:d = '3']"
def test_artifact_payload():
exp1 = stix2.EqualityComparisonExpression("artifact:mime_type",
"application/vnd.tcpdump.pcap")
exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin",
stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00"))
exp1 = stix2.EqualityComparisonExpression(
"artifact:mime_type",
"application/vnd.tcpdump.pcap",
)
exp2 = stix2.MatchesComparisonExpression(
"artifact:payload_bin",
stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00"),
)
and_exp = stix2.ObservationExpression(stix2.AndBooleanExpression([exp1, exp2]))
assert str(and_exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa
def test_greater_than_python_constant():
exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", 7.0)
exp1 = stix2.GreaterThanComparisonExpression("file:extensions.'windows-pebinary-ext'.sections[*].entropy", 7.0)
exp = stix2.ObservationExpression(exp1)
assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]"
assert str(exp) == "[file:extensions.'windows-pebinary-ext'.sections[*].entropy > 7.0]"
def test_greater_than():
exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy",
stix2.FloatConstant(7.0))
exp1 = stix2.GreaterThanComparisonExpression(
"file:extensions.'windows-pebinary-ext'.sections[*].entropy",
stix2.FloatConstant(7.0),
)
exp = stix2.ObservationExpression(exp1)
assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]"
assert str(exp) == "[file:extensions.'windows-pebinary-ext'.sections[*].entropy > 7.0]"
def test_less_than():
@ -133,73 +180,123 @@ def test_less_than():
def test_greater_than_or_equal():
exp = stix2.GreaterThanEqualComparisonExpression("file:size",
1024)
exp = stix2.GreaterThanEqualComparisonExpression(
"file:size",
1024,
)
assert str(exp) == "file:size >= 1024"
def test_less_than_or_equal():
exp = stix2.LessThanEqualComparisonExpression("file:size",
1024)
exp = stix2.LessThanEqualComparisonExpression(
"file:size",
1024,
)
assert str(exp) == "file:size <= 1024"
def test_not():
exp = stix2.LessThanComparisonExpression("file:size",
1024,
negated=True)
exp = stix2.LessThanComparisonExpression(
"file:size",
1024,
negated=True,
)
assert str(exp) == "file:size NOT < 1024"
def test_and_observable_expression():
exp1 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type",
"unix"),
stix2.EqualityComparisonExpression("user-account:user_id",
stix2.StringConstant("1007")),
stix2.EqualityComparisonExpression("user-account:account_login",
"Peter")])
exp2 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type",
"unix"),
stix2.EqualityComparisonExpression("user-account:user_id",
stix2.StringConstant("1008")),
stix2.EqualityComparisonExpression("user-account:account_login",
"Paul")])
exp3 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type",
"unix"),
stix2.EqualityComparisonExpression("user-account:user_id",
stix2.StringConstant("1009")),
stix2.EqualityComparisonExpression("user-account:account_login",
"Mary")])
exp = stix2.AndObservationExpression([stix2.ObservationExpression(exp1),
stix2.ObservationExpression(exp2),
stix2.ObservationExpression(exp3)])
exp1 = stix2.AndBooleanExpression([
stix2.EqualityComparisonExpression(
"user-account:account_type",
"unix",
),
stix2.EqualityComparisonExpression(
"user-account:user_id",
stix2.StringConstant("1007"),
),
stix2.EqualityComparisonExpression(
"user-account:account_login",
"Peter",
),
])
exp2 = stix2.AndBooleanExpression([
stix2.EqualityComparisonExpression(
"user-account:account_type",
"unix",
),
stix2.EqualityComparisonExpression(
"user-account:user_id",
stix2.StringConstant("1008"),
),
stix2.EqualityComparisonExpression(
"user-account:account_login",
"Paul",
),
])
exp3 = stix2.AndBooleanExpression([
stix2.EqualityComparisonExpression(
"user-account:account_type",
"unix",
),
stix2.EqualityComparisonExpression(
"user-account:user_id",
stix2.StringConstant("1009"),
),
stix2.EqualityComparisonExpression(
"user-account:account_login",
"Mary",
),
])
exp = stix2.AndObservationExpression([
stix2.ObservationExpression(exp1),
stix2.ObservationExpression(exp2),
stix2.ObservationExpression(exp3),
])
assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']" # noqa
def test_invalid_and_observable_expression():
with pytest.raises(ValueError) as excinfo:
stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:display_name",
"admin"),
stix2.EqualityComparisonExpression("email-addr:display_name",
stix2.StringConstant("admin"))])
stix2.AndBooleanExpression([
stix2.EqualityComparisonExpression(
"user-account:display_name",
"admin",
),
stix2.EqualityComparisonExpression(
"email-addr:display_name",
stix2.StringConstant("admin"),
),
])
assert "All operands to an 'AND' expression must have the same object type" in str(excinfo)
def test_hex():
exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("file:mime_type",
"image/bmp"),
stix2.EqualityComparisonExpression("file:magic_number_hex",
stix2.HexConstant("ffd8"))])
exp_and = stix2.AndBooleanExpression([
stix2.EqualityComparisonExpression(
"file:mime_type",
"image/bmp",
),
stix2.EqualityComparisonExpression(
"file:magic_number_hex",
stix2.HexConstant("ffd8"),
),
])
exp = stix2.ObservationExpression(exp_and)
assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']"
def test_multiple_qualifiers():
exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("network-traffic:dst_ref.type",
"domain-name"),
stix2.EqualityComparisonExpression("network-traffic:dst_ref.value",
"example.com")])
exp_and = stix2.AndBooleanExpression([
stix2.EqualityComparisonExpression(
"network-traffic:dst_ref.type",
"domain-name",
),
stix2.EqualityComparisonExpression(
"network-traffic:dst_ref.value",
"example.com",
),
])
exp_ob = stix2.ObservationExpression(exp_and)
qual_rep = stix2.RepeatQualifier(5)
qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800))
@ -208,8 +305,10 @@ def test_multiple_qualifiers():
def test_set_op():
exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value",
"2001:0db8:dead:beef:0000:0000:0000:0000/64"))
exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression(
"network-traffic:dst_ref.value",
"2001:0db8:dead:beef:0000:0000:0000:0000/64",
))
assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']"
@ -219,35 +318,45 @@ def test_timestamp():
def test_boolean():
exp = stix2.EqualityComparisonExpression("email-message:is_multipart",
True)
exp = stix2.EqualityComparisonExpression(
"email-message:is_multipart",
True,
)
assert str(exp) == "email-message:is_multipart = true"
def test_binary():
const = stix2.BinaryConstant("dGhpcyBpcyBhIHRlc3Q=")
exp = stix2.EqualityComparisonExpression("artifact:payload_bin",
const)
exp = stix2.EqualityComparisonExpression(
"artifact:payload_bin",
const,
)
assert str(exp) == "artifact:payload_bin = b'dGhpcyBpcyBhIHRlc3Q='"
def test_list():
exp = stix2.InComparisonExpression("process:name",
['proccy', 'proximus', 'badproc'])
exp = stix2.InComparisonExpression(
"process:name",
['proccy', 'proximus', 'badproc'],
)
assert str(exp) == "process:name IN ('proccy', 'proximus', 'badproc')"
def test_list2():
# alternate way to construct an "IN" Comparison Expression
exp = stix2.EqualityComparisonExpression("process:name",
['proccy', 'proximus', 'badproc'])
exp = stix2.EqualityComparisonExpression(
"process:name",
['proccy', 'proximus', 'badproc'],
)
assert str(exp) == "process:name IN ('proccy', 'proximus', 'badproc')"
def test_invalid_constant_type():
with pytest.raises(ValueError) as excinfo:
stix2.EqualityComparisonExpression("artifact:payload_bin",
{'foo': 'bar'})
stix2.EqualityComparisonExpression(
"artifact:payload_bin",
{'foo': 'bar'},
)
assert 'Unable to create a constant' in str(excinfo)
@ -269,20 +378,22 @@ def test_invalid_float_constant():
assert 'must be a float' in str(excinfo)
@pytest.mark.parametrize("data, result", [
(True, True),
(False, False),
('True', True),
('False', False),
('true', True),
('false', False),
('t', True),
('f', False),
('T', True),
('F', False),
(1, True),
(0, False),
])
@pytest.mark.parametrize(
"data, result", [
(True, True),
(False, False),
('True', True),
('False', False),
('true', True),
('false', False),
('t', True),
('f', False),
('T', True),
('F', False),
(1, True),
(0, False),
],
)
def test_boolean_constant(data, result):
boolean = stix2.BooleanConstant(data)
assert boolean.value == result
@ -294,10 +405,12 @@ def test_invalid_boolean_constant():
assert 'must be a boolean' in str(excinfo)
@pytest.mark.parametrize("hashtype, data", [
('MD5', 'zzz'),
('ssdeep', 'zzz=='),
])
@pytest.mark.parametrize(
"hashtype, data", [
('MD5', 'zzz'),
('ssdeep', 'zzz=='),
],
)
def test_invalid_hash_constant(hashtype, data):
with pytest.raises(ValueError) as excinfo:
stix2.HashConstant(data, hashtype)
@ -317,20 +430,26 @@ def test_invalid_binary_constant():
def test_escape_quotes_and_backslashes():
exp = stix2.MatchesComparisonExpression("file:name",
"^Final Report.+\\.exe$")
exp = stix2.MatchesComparisonExpression(
"file:name",
"^Final Report.+\\.exe$",
)
assert str(exp) == "file:name MATCHES '^Final Report.+\\\\.exe$'"
def test_like():
exp = stix2.LikeComparisonExpression("directory:path",
"C:\\Windows\\%\\foo")
exp = stix2.LikeComparisonExpression(
"directory:path",
"C:\\Windows\\%\\foo",
)
assert str(exp) == "directory:path LIKE 'C:\\\\Windows\\\\%\\\\foo'"
def test_issuperset():
exp = stix2.IsSupersetComparisonExpression("ipv4-addr:value",
"198.51.100.0/24")
exp = stix2.IsSupersetComparisonExpression(
"ipv4-addr:value",
"198.51.100.0/24",
)
assert str(exp) == "ipv4-addr:value ISSUPERSET '198.51.100.0/24'"
@ -352,24 +471,32 @@ def test_invalid_within_qualifier():
def test_startstop_qualifier():
qual = stix2.StartStopQualifier(stix2.TimestampConstant('2016-06-01T00:00:00Z'),
datetime.datetime(2017, 3, 12, 8, 30, 0))
qual = stix2.StartStopQualifier(
stix2.TimestampConstant('2016-06-01T00:00:00Z'),
datetime.datetime(2017, 3, 12, 8, 30, 0),
)
assert str(qual) == "START t'2016-06-01T00:00:00Z' STOP t'2017-03-12T08:30:00Z'"
qual2 = stix2.StartStopQualifier(datetime.date(2016, 6, 1),
stix2.TimestampConstant('2016-07-01T00:00:00Z'))
qual2 = stix2.StartStopQualifier(
datetime.date(2016, 6, 1),
stix2.TimestampConstant('2016-07-01T00:00:00Z'),
)
assert str(qual2) == "START t'2016-06-01T00:00:00Z' STOP t'2016-07-01T00:00:00Z'"
def test_invalid_startstop_qualifier():
with pytest.raises(ValueError) as excinfo:
stix2.StartStopQualifier('foo',
stix2.TimestampConstant('2016-06-01T00:00:00Z'))
stix2.StartStopQualifier(
'foo',
stix2.TimestampConstant('2016-06-01T00:00:00Z'),
)
assert 'is not a valid argument for a Start/Stop Qualifier' in str(excinfo)
with pytest.raises(ValueError) as excinfo:
stix2.StartStopQualifier(datetime.date(2016, 6, 1),
'foo')
stix2.StartStopQualifier(
datetime.date(2016, 6, 1),
'foo',
)
assert 'is not a valid argument for a Start/Stop Qualifier' in str(excinfo)
@ -377,3 +504,20 @@ def test_make_constant_already_a_constant():
str_const = stix2.StringConstant('Foo')
result = stix2.patterns.make_constant(str_const)
assert result is str_const
def test_parsing_comparison_expression():
patt_obj = create_pattern_object("[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']")
assert str(patt_obj) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']"
def test_parsing_qualified_expression():
patt_obj = create_pattern_object(
"[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS")
assert str(
patt_obj) == "[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS"
def test_list_constant():
patt_obj = create_pattern_object("[network-traffic:src_ref.value IN ('10.0.0.0', '10.0.0.1', '10.0.0.2')]")
assert str(patt_obj) == "[network-traffic:src_ref.value IN ('10.0.0.0', '10.0.0.1', '10.0.0.2')]"