Merge pull request #445 from chisholm/pattern_equivalence

Pattern equivalence
pull/1/head
Chris Lenk 2020-09-11 09:25:58 -04:00 committed by GitHub
commit 18c6f49e5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 2572 additions and 4 deletions

View File

View File

@ -0,0 +1,115 @@
import stix2
from stix2.equivalence.patterns.compare.observation import (
observation_expression_cmp,
)
from stix2.equivalence.patterns.transform import (
ChainTransformer, SettleTransformer,
)
from stix2.equivalence.patterns.transform.observation import (
AbsorptionTransformer, CanonicalizeComparisonExpressionsTransformer,
DNFTransformer, FlattenTransformer, OrderDedupeTransformer,
)
import stix2.pattern_visitor
# Lazy-initialize
_pattern_canonicalizer = None
def _get_pattern_canonicalizer():
"""
Get a canonicalization transformer for STIX patterns.
:return: The transformer
"""
# The transformers are either stateless or contain no state which changes
# with each use. So we can setup the transformers once and keep reusing
# them.
global _pattern_canonicalizer
if not _pattern_canonicalizer:
canonicalize_comp_expr = \
CanonicalizeComparisonExpressionsTransformer()
obs_expr_flatten = FlattenTransformer()
obs_expr_order = OrderDedupeTransformer()
obs_expr_absorb = AbsorptionTransformer()
obs_simplify = ChainTransformer(
obs_expr_flatten, obs_expr_order, obs_expr_absorb,
)
obs_settle_simplify = SettleTransformer(obs_simplify)
obs_dnf = DNFTransformer()
_pattern_canonicalizer = ChainTransformer(
canonicalize_comp_expr,
obs_settle_simplify, obs_dnf, obs_settle_simplify,
)
return _pattern_canonicalizer
def equivalent_patterns(pattern1, pattern2, stix_version=stix2.DEFAULT_VERSION):
"""
Determine whether two STIX patterns are semantically equivalent.
:param pattern1: The first STIX pattern
:param pattern2: The second STIX pattern
:param stix_version: The STIX version to use for pattern parsing, as a
string ("2.0", "2.1", etc). Defaults to library-wide default version.
:return: True if the patterns are semantically equivalent; False if not
"""
patt_ast1 = stix2.pattern_visitor.create_pattern_object(
pattern1, version=stix_version,
)
patt_ast2 = stix2.pattern_visitor.create_pattern_object(
pattern2, version=stix_version,
)
pattern_canonicalizer = _get_pattern_canonicalizer()
canon_patt1, _ = pattern_canonicalizer.transform(patt_ast1)
canon_patt2, _ = pattern_canonicalizer.transform(patt_ast2)
result = observation_expression_cmp(canon_patt1, canon_patt2)
return result == 0
def find_equivalent_patterns(
search_pattern, patterns, stix_version=stix2.DEFAULT_VERSION,
):
"""
Find patterns from a sequence which are equivalent to a given pattern.
This is more efficient than using equivalent_patterns() in a loop, because
it doesn't re-canonicalize the search pattern over and over. This works
on an input iterable and is implemented as a generator of matches. So you
can "stream" patterns in and matching patterns will be streamed out.
:param search_pattern: A search pattern as a string
:param patterns: An iterable over patterns as strings
:param stix_version: The STIX version to use for pattern parsing, as a
string ("2.0", "2.1", etc). Defaults to library-wide default version.
:return: A generator iterator producing the semantically equivalent
patterns
"""
search_pattern_ast = stix2.pattern_visitor.create_pattern_object(
search_pattern, version=stix_version,
)
pattern_canonicalizer = _get_pattern_canonicalizer()
canon_search_pattern_ast, _ = pattern_canonicalizer.transform(
search_pattern_ast,
)
for pattern in patterns:
pattern_ast = stix2.pattern_visitor.create_pattern_object(
pattern, version=stix_version,
)
canon_pattern_ast, _ = pattern_canonicalizer.transform(pattern_ast)
result = observation_expression_cmp(
canon_search_pattern_ast, canon_pattern_ast,
)
if result == 0:
yield pattern

View File

@ -0,0 +1,91 @@
"""
Some generic comparison utility functions.
"""
def generic_cmp(value1, value2):
"""
Generic comparator of values which uses the builtin '<' and '>' operators.
Assumes the values can be compared that way.
:param value1: The first value
:param value2: The second value
:return: -1, 0, or 1 depending on whether value1 is less, equal, or greater
than value2
"""
return -1 if value1 < value2 else 1 if value1 > value2 else 0
def iter_lex_cmp(seq1, seq2, cmp):
"""
Generic lexicographical compare function, which works on two iterables and
a comparator function.
:param seq1: The first iterable
:param seq2: The second iterable
:param cmp: a two-arg callable comparator for values iterated over. It
must behave analogously to this function, returning <0, 0, or >0 to
express the ordering of the two values.
:return: <0 if seq1 < seq2; >0 if seq1 > seq2; 0 if they're equal
"""
it1 = iter(seq1)
it2 = iter(seq2)
it1_exhausted = it2_exhausted = False
while True:
try:
val1 = next(it1)
except StopIteration:
it1_exhausted = True
try:
val2 = next(it2)
except StopIteration:
it2_exhausted = True
# same length, all elements equal
if it1_exhausted and it2_exhausted:
result = 0
break
# one is a prefix of the other; the shorter one is less
elif it1_exhausted:
result = -1
break
elif it2_exhausted:
result = 1
break
# neither is exhausted; check values
else:
val_cmp = cmp(val1, val2)
if val_cmp != 0:
result = val_cmp
break
return result
def iter_in(value, seq, cmp):
"""
A function behaving like the "in" Python operator, but which works with a
a comparator function. This function checks whether the given value is
contained in the given iterable.
:param value: A value
:param seq: An iterable
:param cmp: A 2-arg comparator function which must return 0 if the args
are equal
:return: True if the value is found in the iterable, False if it is not
"""
result = False
for seq_val in seq:
if cmp(value, seq_val) == 0:
result = True
break
return result

View File

@ -0,0 +1,351 @@
"""
Comparison utilities for STIX pattern comparison expressions.
"""
import base64
import functools
from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp
from stix2.patterns import (
AndBooleanExpression, BinaryConstant, BooleanConstant, FloatConstant,
HexConstant, IntegerConstant, ListConstant, ListObjectPathComponent,
OrBooleanExpression, StringConstant, TimestampConstant,
_ComparisonExpression,
)
_COMPARISON_OP_ORDER = (
"=", "!=", "<>", "<", "<=", ">", ">=",
"IN", "LIKE", "MATCHES", "ISSUBSET", "ISSUPERSET",
)
_CONSTANT_TYPE_ORDER = (
# ints/floats come first, but have special handling since the types are
# treated equally as a generic "number" type. So they aren't in this list.
# See constant_cmp().
StringConstant, BooleanConstant,
TimestampConstant, HexConstant, BinaryConstant, ListConstant,
)
def generic_constant_cmp(const1, const2):
"""
Generic comparator for most _Constant instances. They must have a "value"
attribute whose value supports the builtin comparison operators.
:param const1: The first _Constant instance
:param const2: The second _Constant instance
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
return generic_cmp(const1.value, const2.value)
def bool_cmp(value1, value2):
"""
Compare two boolean constants.
:param value1: The first BooleanConstant instance
:param value2: The second BooleanConstant instance
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
# unwrap from _Constant instances
value1 = value1.value
value2 = value2.value
if (value1 and value2) or (not value1 and not value2):
result = 0
# Let's say... True < False?
elif value1:
result = -1
else:
result = 1
return result
def hex_cmp(value1, value2):
"""
Compare two STIX "hex" values. This decodes to bytes and compares that.
It does *not* do a string compare on the hex representations.
:param value1: The first HexConstant
:param value2: The second HexConstant
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
bytes1 = bytes.fromhex(value1.value)
bytes2 = bytes.fromhex(value2.value)
return generic_cmp(bytes1, bytes2)
def bin_cmp(value1, value2):
"""
Compare two STIX "binary" values. This decodes to bytes and compares that.
It does *not* do a string compare on the base64 representations.
:param value1: The first BinaryConstant
:param value2: The second BinaryConstant
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
bytes1 = base64.standard_b64decode(value1.value)
bytes2 = base64.standard_b64decode(value2.value)
return generic_cmp(bytes1, bytes2)
def list_cmp(value1, value2):
"""
Compare lists order-insensitively.
:param value1: The first ListConstant
:param value2: The second ListConstant
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
# Achieve order-independence by sorting the lists first.
sorted_value1 = sorted(
value1.value, key=functools.cmp_to_key(constant_cmp),
)
sorted_value2 = sorted(
value2.value, key=functools.cmp_to_key(constant_cmp),
)
result = iter_lex_cmp(sorted_value1, sorted_value2, constant_cmp)
return result
_CONSTANT_COMPARATORS = {
# We have special handling for ints/floats, so no entries for those AST
# classes here. See constant_cmp().
StringConstant: generic_constant_cmp,
BooleanConstant: bool_cmp,
TimestampConstant: generic_constant_cmp,
HexConstant: hex_cmp,
BinaryConstant: bin_cmp,
ListConstant: list_cmp,
}
def object_path_component_cmp(comp1, comp2):
"""
Compare a string/int to another string/int; this induces an ordering over
all strings and ints. It is used to perform a lexicographical sort on
object paths.
Ints and strings compare as usual to each other; ints compare less than
strings.
:param comp1: An object path component (string or int)
:param comp2: An object path component (string or int)
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
# both ints or both strings: use builtin comparison operators
if (isinstance(comp1, int) and isinstance(comp2, int)) \
or (isinstance(comp1, str) and isinstance(comp2, str)):
result = generic_cmp(comp1, comp2)
# one is int, one is string. Let's say ints come before strings.
elif isinstance(comp1, int):
result = -1
else:
result = 1
return result
def object_path_to_raw_values(path):
"""
Converts the given ObjectPath instance to a list of strings and ints.
All property names become strings, regardless of whether they're *_ref
properties; "*" index steps become that string; and numeric index steps
become integers.
:param path: An ObjectPath instance
:return: A generator iterator over the values
"""
for comp in path.property_path:
if isinstance(comp, ListObjectPathComponent):
yield comp.property_name
if comp.index == "*" or isinstance(comp.index, int):
yield comp.index
else:
# in case the index is a stringified int; convert to an actual
# int
yield int(comp.index)
else:
yield comp.property_name
def object_path_cmp(path1, path2):
"""
Compare two object paths.
:param path1: The first ObjectPath instance
:param path2: The second ObjectPath instance
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
if path1.object_type_name < path2.object_type_name:
result = -1
elif path1.object_type_name > path2.object_type_name:
result = 1
else:
# I always thought of key and index path steps as separate. The AST
# lumps indices in with the previous key as a single path component.
# The following splits the path components into individual comparable
# values again. Maybe I should not do this...
path_vals1 = object_path_to_raw_values(path1)
path_vals2 = object_path_to_raw_values(path2)
result = iter_lex_cmp(
path_vals1, path_vals2, object_path_component_cmp,
)
return result
def comparison_operator_cmp(op1, op2):
"""
Compare two comparison operators.
:param op1: The first comparison operator (a string)
:param op2: The second comparison operator (a string)
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
op1_idx = _COMPARISON_OP_ORDER.index(op1)
op2_idx = _COMPARISON_OP_ORDER.index(op2)
result = generic_cmp(op1_idx, op2_idx)
return result
def constant_cmp(value1, value2):
"""
Compare two constants.
:param value1: The first _Constant instance
:param value2: The second _Constant instance
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
# Special handling for ints/floats: treat them generically as numbers,
# ordered before all other types.
if isinstance(value1, (IntegerConstant, FloatConstant)) \
and isinstance(value2, (IntegerConstant, FloatConstant)):
result = generic_constant_cmp(value1, value2)
elif isinstance(value1, (IntegerConstant, FloatConstant)):
result = -1
elif isinstance(value2, (IntegerConstant, FloatConstant)):
result = 1
else:
type1 = type(value1)
type2 = type(value2)
type1_idx = _CONSTANT_TYPE_ORDER.index(type1)
type2_idx = _CONSTANT_TYPE_ORDER.index(type2)
result = generic_cmp(type1_idx, type2_idx)
if result == 0:
# Types are the same; must compare values
cmp_func = _CONSTANT_COMPARATORS.get(type1)
if not cmp_func:
raise TypeError("Don't know how to compare " + type1.__name__)
result = cmp_func(value1, value2)
return result
def simple_comparison_expression_cmp(expr1, expr2):
"""
Compare "simple" comparison expressions: those which aren't AND/OR
combinations, just <path> <op> <value> comparisons.
:param expr1: first _ComparisonExpression instance
:param expr2: second _ComparisonExpression instance
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
result = object_path_cmp(expr1.lhs, expr2.lhs)
if result == 0:
result = comparison_operator_cmp(expr1.operator, expr2.operator)
if result == 0:
# _ComparisonExpression's have a "negated" attribute. Umm...
# non-negated < negated?
if not expr1.negated and expr2.negated:
result = -1
elif expr1.negated and not expr2.negated:
result = 1
if result == 0:
result = constant_cmp(expr1.rhs, expr2.rhs)
return result
def comparison_expression_cmp(expr1, expr2):
"""
Compare two comparison expressions. This is sensitive to the order of the
expressions' sub-components. To achieve an order-insensitive comparison,
the ASTs must be canonically ordered first.
:param expr1: The first comparison expression
:param expr2: The second comparison expression
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
if isinstance(expr1, _ComparisonExpression) \
and isinstance(expr2, _ComparisonExpression):
result = simple_comparison_expression_cmp(expr1, expr2)
# One is simple, one is compound. Let's say... simple ones come first?
elif isinstance(expr1, _ComparisonExpression):
result = -1
elif isinstance(expr2, _ComparisonExpression):
result = 1
# Both are compound: AND's before OR's?
elif isinstance(expr1, AndBooleanExpression) \
and isinstance(expr2, OrBooleanExpression):
result = -1
elif isinstance(expr1, OrBooleanExpression) \
and isinstance(expr2, AndBooleanExpression):
result = 1
else:
# Both compound, same boolean operator: sort according to contents.
# This will order according to recursive invocations of this comparator,
# on sub-expressions.
result = iter_lex_cmp(
expr1.operands, expr2.operands, comparison_expression_cmp,
)
return result

View File

@ -0,0 +1,123 @@
"""
Comparison utilities for STIX pattern observation expressions.
"""
from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp
from stix2.equivalence.patterns.compare.comparison import (
comparison_expression_cmp, generic_constant_cmp,
)
from stix2.patterns import (
AndObservationExpression, FollowedByObservationExpression,
ObservationExpression, OrObservationExpression,
QualifiedObservationExpression, RepeatQualifier, StartStopQualifier,
WithinQualifier, _CompoundObservationExpression,
)
_OBSERVATION_EXPRESSION_TYPE_ORDER = (
ObservationExpression, AndObservationExpression, OrObservationExpression,
FollowedByObservationExpression, QualifiedObservationExpression,
)
_QUALIFIER_TYPE_ORDER = (
RepeatQualifier, WithinQualifier, StartStopQualifier,
)
def repeats_cmp(qual1, qual2):
"""
Compare REPEATS qualifiers. This orders by repeat count.
"""
return generic_constant_cmp(qual1.times_to_repeat, qual2.times_to_repeat)
def within_cmp(qual1, qual2):
"""
Compare WITHIN qualifiers. This orders by number of seconds.
"""
return generic_constant_cmp(
qual1.number_of_seconds, qual2.number_of_seconds,
)
def startstop_cmp(qual1, qual2):
"""
Compare START/STOP qualifiers. This lexicographically orders by start time,
then stop time.
"""
return iter_lex_cmp(
(qual1.start_time, qual1.stop_time),
(qual2.start_time, qual2.stop_time),
generic_constant_cmp,
)
_QUALIFIER_COMPARATORS = {
RepeatQualifier: repeats_cmp,
WithinQualifier: within_cmp,
StartStopQualifier: startstop_cmp,
}
def observation_expression_cmp(expr1, expr2):
"""
Compare two observation expression ASTs. This is sensitive to the order of
the expressions' sub-components. To achieve an order-insensitive
comparison, the ASTs must be canonically ordered first.
:param expr1: The first observation expression
:param expr2: The second observation expression
:return: <0, 0, or >0 depending on whether the first arg is less, equal or
greater than the second
"""
type1 = type(expr1)
type2 = type(expr2)
type1_idx = _OBSERVATION_EXPRESSION_TYPE_ORDER.index(type1)
type2_idx = _OBSERVATION_EXPRESSION_TYPE_ORDER.index(type2)
if type1_idx != type2_idx:
result = generic_cmp(type1_idx, type2_idx)
# else, both exprs are of same type.
# If they're simple, use contained comparison expression order
elif type1 is ObservationExpression:
result = comparison_expression_cmp(
expr1.operand, expr2.operand,
)
elif isinstance(expr1, _CompoundObservationExpression):
# Both compound, and of same type (and/or/followedby): sort according
# to contents.
result = iter_lex_cmp(
expr1.operands, expr2.operands, observation_expression_cmp,
)
else: # QualifiedObservationExpression
# Both qualified. Check qualifiers first; if they are the same,
# use order of the qualified expressions.
qual1_type = type(expr1.qualifier)
qual2_type = type(expr2.qualifier)
qual1_type_idx = _QUALIFIER_TYPE_ORDER.index(qual1_type)
qual2_type_idx = _QUALIFIER_TYPE_ORDER.index(qual2_type)
result = generic_cmp(qual1_type_idx, qual2_type_idx)
if result == 0:
# Same qualifier type; compare qualifier details
qual_cmp = _QUALIFIER_COMPARATORS.get(qual1_type)
if qual_cmp:
result = qual_cmp(expr1.qualifier, expr2.qualifier)
else:
raise TypeError(
"Can't compare qualifier type: " + qual1_type.__name__,
)
if result == 0:
# Same qualifier type and details; use qualified expression order
result = observation_expression_cmp(
expr1.observation_expression, expr2.observation_expression,
)
return result

View File

@ -0,0 +1,57 @@
"""
Generic AST transformation classes.
"""
class Transformer:
"""
Base class for AST transformers.
"""
def transform(self, ast):
"""
Transform the given AST and return the resulting AST.
:param ast: The AST to transform
:return: A 2-tuple: the transformed AST and a boolean indicating whether
the transformation actually changed anything. The change detection
is useful in situations where a transformation needs to be repeated
until the AST stops changing.
"""
raise NotImplementedError("transform")
class ChainTransformer(Transformer):
"""
A composite transformer which consists of a sequence of sub-transformers.
Applying this transformer applies all sub-transformers in sequence, as
a group.
"""
def __init__(self, *transformers):
self.__transformers = transformers
def transform(self, ast):
changed = False
for transformer in self.__transformers:
ast, this_changed = transformer.transform(ast)
if this_changed:
changed = True
return ast, changed
class SettleTransformer(Transformer):
"""
A transformer that repeatedly performs a transformation until that
transformation no longer changes the AST. I.e. the AST has "settled".
"""
def __init__(self, transform):
self.__transformer = transform
def transform(self, ast):
changed = False
ast, this_changed = self.__transformer.transform(ast)
while this_changed:
changed = True
ast, this_changed = self.__transformer.transform(ast)
return ast, changed

View File

@ -0,0 +1,378 @@
"""
Transformation utilities for STIX pattern comparison expressions.
"""
import functools
import itertools
from stix2.equivalence.patterns.compare import iter_in, iter_lex_cmp
from stix2.equivalence.patterns.compare.comparison import (
comparison_expression_cmp,
)
from stix2.equivalence.patterns.transform import Transformer
from stix2.equivalence.patterns.transform.specials import (
ipv4_addr, ipv6_addr, windows_reg_key,
)
from stix2.patterns import (
AndBooleanExpression, OrBooleanExpression, ParentheticalExpression,
_BooleanExpression, _ComparisonExpression,
)
def _dupe_ast(ast):
"""
Create a duplicate of the given AST.
Note: the comparison expression "leaves", i.e. simple <path> <op> <value>
comparisons are currently not duplicated. I don't think it's necessary as
of this writing; they are never changed. But revisit this if/when
necessary.
:param ast: The AST to duplicate
:return: The duplicate AST
"""
if isinstance(ast, AndBooleanExpression):
result = AndBooleanExpression([
_dupe_ast(operand) for operand in ast.operands
])
elif isinstance(ast, OrBooleanExpression):
result = OrBooleanExpression([
_dupe_ast(operand) for operand in ast.operands
])
elif isinstance(ast, _ComparisonExpression):
# Change this to create a dupe, if we ever need to change simple
# comparison expressions as part of canonicalization.
result = ast
else:
raise TypeError("Can't duplicate " + type(ast).__name__)
return result
class ComparisonExpressionTransformer(Transformer):
"""
Transformer base class with special support for transforming comparison
expressions. The transform method implemented here performs a bottom-up
in-place transformation, with support for some comparison
expression-specific callbacks.
Specifically, subclasses can implement methods:
"transform_or" for OR nodes
"transform_and" for AND nodes
"transform_comparison" for plain comparison nodes (<prop> <op> <value>)
"transform_default" for both types of nodes
"transform_default" is a fallback, if a type-specific callback is not
found. The default implementation does nothing to the AST. The
type-specific callbacks are preferred over the default, if both exist.
In all cases, the callbacks are called with an AST for a subtree rooted at
the appropriate node type, where the subtree's children have already been
transformed. They must return the same thing as the base transform()
method: a 2-tuple with the transformed AST and a boolean for change
detection. See doc for the superclass' method.
This process currently silently drops parenthetical nodes.
"""
def transform(self, ast):
if isinstance(ast, _BooleanExpression):
changed = False
for i, operand in enumerate(ast.operands):
operand_result, this_changed = self.transform(operand)
if this_changed:
changed = True
ast.operands[i] = operand_result
result, this_changed = self.__dispatch_transform(ast)
if this_changed:
changed = True
elif isinstance(ast, _ComparisonExpression):
result, changed = self.__dispatch_transform(ast)
elif isinstance(ast, ParentheticalExpression):
# Drop these
result, changed = self.transform(ast.expression)
else:
raise TypeError("Not a comparison expression: " + str(ast))
return result, changed
def __dispatch_transform(self, ast):
"""
Invoke a transformer callback method based on the given ast root node
type.
:param ast: The AST
:return: The callback's result
"""
if isinstance(ast, AndBooleanExpression):
meth = getattr(self, "transform_and", self.transform_default)
elif isinstance(ast, OrBooleanExpression):
meth = getattr(self, "transform_or", self.transform_default)
elif isinstance(ast, _ComparisonExpression):
meth = getattr(
self, "transform_comparison", self.transform_default,
)
else:
meth = self.transform_default
return meth(ast)
def transform_default(self, ast):
"""
Override to handle transforming AST nodes which don't have a more
specific method implemented.
"""
return ast, False
class OrderDedupeTransformer(
ComparisonExpressionTransformer
):
"""
Canonically order the children of all nodes in the AST. Because the
deduping algorithm is based on sorted data, this transformation also does
deduping.
E.g.:
A and A => A
A or A => A
"""
def __transform(self, ast):
"""
Sort/dedupe children. AND and OR can be treated identically.
:param ast: The comparison expression AST
:return: The same AST node, but with sorted children
"""
sorted_children = sorted(
ast.operands, key=functools.cmp_to_key(comparison_expression_cmp),
)
deduped_children = [
# Apparently when using a key function, groupby()'s "keys" are the
# key wrappers, not actual sequence values. Obviously we don't
# need key wrappers in our ASTs!
k.obj for k, _ in itertools.groupby(
sorted_children, key=functools.cmp_to_key(
comparison_expression_cmp,
),
)
]
changed = iter_lex_cmp(
ast.operands, deduped_children, comparison_expression_cmp,
) != 0
ast.operands = deduped_children
return ast, changed
def transform_or(self, ast):
return self.__transform(ast)
def transform_and(self, ast):
return self.__transform(ast)
class FlattenTransformer(ComparisonExpressionTransformer):
"""
Flatten all nodes of the AST. E.g.:
A and (B and C) => A and B and C
A or (B or C) => A or B or C
(A) => A
"""
def __transform(self, ast):
"""
Flatten children. AND and OR can be treated mostly identically. The
little difference is that we can absorb AND children if we're an AND
ourselves; and OR for OR.
:param ast: The comparison expression AST
:return: The same AST node, but with flattened children
"""
changed = False
if len(ast.operands) == 1:
# Replace an AND/OR with one child, with the child itself.
ast = ast.operands[0]
changed = True
else:
flat_operands = []
for operand in ast.operands:
if isinstance(operand, _BooleanExpression) \
and ast.operator == operand.operator:
flat_operands.extend(operand.operands)
changed = True
else:
flat_operands.append(operand)
ast.operands = flat_operands
return ast, changed
def transform_or(self, ast):
return self.__transform(ast)
def transform_and(self, ast):
return self.__transform(ast)
class AbsorptionTransformer(
ComparisonExpressionTransformer
):
"""
Applies boolean "absorption" rules for AST simplification. E.g.:
A and (A or B) = A
A or (A and B) = A
"""
def __transform(self, ast):
changed = False
secondary_op = "AND" if ast.operator == "OR" else "OR"
to_delete = set()
# Check i (child1) against j to see if we can delete j.
for i, child1 in enumerate(ast.operands):
if i in to_delete:
continue
for j, child2 in enumerate(ast.operands):
if i == j or j in to_delete:
continue
# We're checking if child1 is contained in child2, so
# child2 has to be a compound object, not just a simple
# comparison expression. We also require the right operator
# for child2: "AND" if ast is "OR" and vice versa.
if not isinstance(child2, _BooleanExpression) \
or child2.operator != secondary_op:
continue
# The simple check: is child1 contained in child2?
if iter_in(
child1, child2.operands, comparison_expression_cmp,
):
to_delete.add(j)
# A more complicated check: does child1 occur in child2
# in a "flattened" form?
elif child1.operator == child2.operator:
if all(
iter_in(
child1_operand, child2.operands,
comparison_expression_cmp,
)
for child1_operand in child1.operands
):
to_delete.add(j)
if to_delete:
changed = True
for i in reversed(sorted(to_delete)):
del ast.operands[i]
return ast, changed
def transform_or(self, ast):
return self.__transform(ast)
def transform_and(self, ast):
return self.__transform(ast)
class DNFTransformer(ComparisonExpressionTransformer):
"""
Convert a comparison expression AST to DNF. E.g.:
A and (B or C) => (A and B) or (A and C)
"""
def transform_and(self, ast):
or_children = []
other_children = []
changed = False
# Sort AND children into two piles: the ORs and everything else
for child in ast.operands:
if isinstance(child, _BooleanExpression) and child.operator == "OR":
# Need a list of operand lists, so we can compute the
# product below.
or_children.append(child.operands)
else:
other_children.append(child)
if or_children:
distributed_children = [
AndBooleanExpression([
# Make dupes: distribution implies adding repetition, and
# we should ensure each repetition is independent of the
# others.
_dupe_ast(sub_ast) for sub_ast in itertools.chain(
other_children, prod_seq,
)
])
for prod_seq in itertools.product(*or_children)
]
# Need to recursively continue to distribute AND over OR in
# any of our new sub-expressions which need it. This causes
# more downward recursion in the midst of this bottom-up transform.
# It's not good for performance. I wonder if a top-down
# transformation algorithm would make more sense in this phase?
# But then we'd be using two different algorithms for the same
# thing... Maybe this transform should be completely top-down
# (no bottom-up component at all)?
distributed_children = [
self.transform(child)[0] for child in distributed_children
]
result = OrBooleanExpression(distributed_children)
changed = True
else:
# No AND-over-OR; nothing to do
result = ast
return result, changed
class SpecialValueCanonicalization(ComparisonExpressionTransformer):
"""
Try to find particular leaf-node comparison expressions whose rhs (i.e. the
constant) can be canonicalized. This is an idiosyncratic transformation
based on some ideas people had for context-sensitive semantic equivalence
in constant values.
"""
def transform_comparison(self, ast):
if ast.lhs.object_type_name == "windows-registry-key":
windows_reg_key(ast)
elif ast.lhs.object_type_name == "ipv4-addr":
ipv4_addr(ast)
elif ast.lhs.object_type_name == "ipv6-addr":
ipv6_addr(ast)
# Hard-code False here since this particular canonicalization is never
# worth doing more than once. I think it's okay to pretend nothing has
# changed.
return ast, False

View File

@ -0,0 +1,495 @@
"""
Transformation utilities for STIX pattern observation expressions.
"""
import functools
import itertools
from stix2.equivalence.patterns.compare import iter_in, iter_lex_cmp
from stix2.equivalence.patterns.compare.observation import (
observation_expression_cmp,
)
from stix2.equivalence.patterns.transform import (
ChainTransformer, SettleTransformer, Transformer,
)
from stix2.equivalence.patterns.transform.comparison import (
SpecialValueCanonicalization,
)
from stix2.equivalence.patterns.transform.comparison import \
AbsorptionTransformer as CAbsorptionTransformer
from stix2.equivalence.patterns.transform.comparison import \
DNFTransformer as CDNFTransformer
from stix2.equivalence.patterns.transform.comparison import \
FlattenTransformer as CFlattenTransformer
from stix2.equivalence.patterns.transform.comparison import \
OrderDedupeTransformer as COrderDedupeTransformer
from stix2.patterns import (
AndObservationExpression, FollowedByObservationExpression,
ObservationExpression, OrObservationExpression, ParentheticalExpression,
QualifiedObservationExpression, _CompoundObservationExpression,
)
def _dupe_ast(ast):
"""
Create a duplicate of the given AST. The AST root must be an observation
expression of some kind (AND/OR/qualified, etc).
Note: the observation expression "leaves", i.e. simple square-bracket
observation expressions are currently not duplicated. I don't think it's
necessary as of this writing. But revisit this if/when necessary.
:param ast: The AST to duplicate
:return: The duplicate AST
"""
if isinstance(ast, AndObservationExpression):
result = AndObservationExpression([
_dupe_ast(child) for child in ast.operands
])
elif isinstance(ast, OrObservationExpression):
result = OrObservationExpression([
_dupe_ast(child) for child in ast.operands
])
elif isinstance(ast, FollowedByObservationExpression):
result = FollowedByObservationExpression([
_dupe_ast(child) for child in ast.operands
])
elif isinstance(ast, QualifiedObservationExpression):
# Don't need to dupe the qualifier object at this point
result = QualifiedObservationExpression(
_dupe_ast(ast.observation_expression), ast.qualifier,
)
elif isinstance(ast, ObservationExpression):
result = ast
else:
raise TypeError("Can't duplicate " + type(ast).__name__)
return result
class ObservationExpressionTransformer(Transformer):
"""
Transformer base class with special support for transforming observation
expressions. The transform method implemented here performs a bottom-up
in-place transformation, with support for some observation
expression-specific callbacks. It recurses down as far as the "leaf node"
observation expressions; it does not go inside of them, to the individual
components of a comparison expression.
Specifically, subclasses can implement methods:
"transform_or" for OR nodes
"transform_and" for AND nodes
"transform_followedby" for FOLLOWEDBY nodes
"transform_qualified" for qualified nodes (all qualifier types)
"transform_observation" for "leaf" observation expression nodes
"transform_default" for all types of nodes
"transform_default" is a fallback, if a type-specific callback is not
found. The default implementation does nothing to the AST. The
type-specific callbacks are preferred over the default, if both exist.
In all cases, the callbacks are called with an AST for a subtree rooted at
the appropriate node type, where the AST's children have already been
transformed. They must return the same thing as the base transform()
method: a 2-tuple with the transformed AST and a boolean for change
detection. See doc for the superclass' method.
This process currently silently drops parenthetical nodes.
"""
# Determines how AST node types map to callback method names
_DISPATCH_NAME_MAP = {
ObservationExpression: "observation",
AndObservationExpression: "and",
OrObservationExpression: "or",
FollowedByObservationExpression: "followedby",
QualifiedObservationExpression: "qualified",
}
def transform(self, ast):
changed = False
if isinstance(ast, ObservationExpression):
# A "leaf node" for observation expressions. We don't recurse into
# these.
result, this_changed = self.__dispatch_transform(ast)
if this_changed:
changed = True
elif isinstance(ast, _CompoundObservationExpression):
for i, operand in enumerate(ast.operands):
result, this_changed = self.transform(operand)
if this_changed:
ast.operands[i] = result
changed = True
result, this_changed = self.__dispatch_transform(ast)
if this_changed:
changed = True
elif isinstance(ast, QualifiedObservationExpression):
# I don't think we need to process/transform the qualifier by
# itself, do we?
result, this_changed = self.transform(ast.observation_expression)
if this_changed:
ast.observation_expression = result
changed = True
result, this_changed = self.__dispatch_transform(ast)
if this_changed:
changed = True
elif isinstance(ast, ParentheticalExpression):
result, _ = self.transform(ast.expression)
# Dropping a node is a change, right?
changed = True
else:
raise TypeError("Not an observation expression: {}: {}".format(
type(ast).__name__, str(ast),
))
return result, changed
def __dispatch_transform(self, ast):
"""
Invoke a transformer callback method based on the given ast root node
type.
:param ast: The AST
:return: The callback's result
"""
dispatch_name = self._DISPATCH_NAME_MAP.get(type(ast))
if dispatch_name:
meth_name = "transform_" + dispatch_name
meth = getattr(self, meth_name, self.transform_default)
else:
meth = self.transform_default
return meth(ast)
def transform_default(self, ast):
return ast, False
class FlattenTransformer(ObservationExpressionTransformer):
"""
Flatten an observation expression AST. E.g.:
A and (B and C) => A and B and C
A or (B or C) => A or B or C
A followedby (B followedby C) => A followedby B followedby C
(A) => A
"""
def __transform(self, ast):
changed = False
if len(ast.operands) == 1:
# Replace an AND/OR/FOLLOWEDBY with one child, with the child
# itself.
result = ast.operands[0]
changed = True
else:
flat_children = []
for operand in ast.operands:
if isinstance(operand, _CompoundObservationExpression) \
and ast.operator == operand.operator:
flat_children.extend(operand.operands)
changed = True
else:
flat_children.append(operand)
ast.operands = flat_children
result = ast
return result, changed
def transform_and(self, ast):
return self.__transform(ast)
def transform_or(self, ast):
return self.__transform(ast)
def transform_followedby(self, ast):
return self.__transform(ast)
class OrderDedupeTransformer(
ObservationExpressionTransformer
):
"""
Canonically order AND/OR expressions, and dedupe ORs. E.g.:
A or A => A
B or A => A or B
B and A => A and B
"""
def __transform(self, ast):
sorted_children = sorted(
ast.operands, key=functools.cmp_to_key(observation_expression_cmp),
)
# Deduping only applies to ORs
if ast.operator == "OR":
deduped_children = [
key.obj for key, _ in itertools.groupby(
sorted_children, key=functools.cmp_to_key(
observation_expression_cmp,
),
)
]
else:
deduped_children = sorted_children
changed = iter_lex_cmp(
ast.operands, deduped_children, observation_expression_cmp,
) != 0
ast.operands = deduped_children
return ast, changed
def transform_and(self, ast):
return self.__transform(ast)
def transform_or(self, ast):
return self.__transform(ast)
class AbsorptionTransformer(
ObservationExpressionTransformer
):
"""
Applies boolean "absorption" rules for observation expressions, for AST
simplification:
A or (A and B) = A
A or (A followedby B) = A
Other variants do not hold for observation expressions.
"""
def __is_contained_and(self, exprs_containee, exprs_container):
"""
Determine whether the "containee" expressions are contained in the
"container" expressions, with AND semantics (order-independent but need
distinct bindings). For example (with containee on left and container
on right):
(A and A and B) or (A and B and C)
In the above, all of the lhs vars have a counterpart in the rhs, but
there are two A's on the left and only one on the right. Therefore,
the right does not "contain" the left. You would need two A's on the
right.
:param exprs_containee: The expressions we want to check for containment
:param exprs_container: The expressions acting as the "container"
:return: True if the containee is contained in the container; False if
not
"""
# make our own list we are free to manipulate without affecting the
# function args.
container = list(exprs_container)
result = True
for ee in exprs_containee:
for i, er in enumerate(container):
if observation_expression_cmp(ee, er) == 0:
# Found a match in the container; delete it so we never try
# to match a container expr to two different containee
# expressions.
del container[i]
break
else:
result = False
break
return result
def __is_contained_followedby(self, exprs_containee, exprs_container):
"""
Determine whether the "containee" expressions are contained in the
"container" expressions, with FOLLOWEDBY semantics (order-sensitive and
need distinct bindings). For example (with containee on left and
container on right):
(A followedby B) or (B followedby A)
In the above, all of the lhs vars have a counterpart in the rhs, but
the vars on the right are not in the same order. Therefore, the right
does not "contain" the left. The container vars don't have to be
contiguous though. E.g. in:
(A followedby B) or (D followedby A followedby C followedby B)
in the container (rhs), B follows A, so it "contains" the lhs even
though there is other stuff mixed in.
:param exprs_containee: The expressions we want to check for containment
:param exprs_container: The expressions acting as the "container"
:return: True if the containee is contained in the container; False if
not
"""
ee_iter = iter(exprs_containee)
er_iter = iter(exprs_container)
result = True
while True:
ee = next(ee_iter, None)
if not ee:
break
while True:
er = next(er_iter, None)
if er:
if observation_expression_cmp(ee, er) == 0:
break
else:
break
if not er:
result = False
break
return result
def transform_or(self, ast):
changed = False
to_delete = set()
for i, child1 in enumerate(ast.operands):
if i in to_delete:
continue
# The simplification doesn't work across qualifiers
if isinstance(child1, QualifiedObservationExpression):
continue
for j, child2 in enumerate(ast.operands):
if i == j or j in to_delete:
continue
if isinstance(
child2, (
AndObservationExpression,
FollowedByObservationExpression,
),
):
# The simple check: is child1 contained in child2?
if iter_in(
child1, child2.operands, observation_expression_cmp,
):
to_delete.add(j)
# A more complicated check: does child1 occur in child2
# in a "flattened" form?
elif type(child1) is type(child2):
if isinstance(child1, AndObservationExpression):
can_simplify = self.__is_contained_and(
child1.operands, child2.operands,
)
else: # child1 and 2 are followedby nodes
can_simplify = self.__is_contained_followedby(
child1.operands, child2.operands,
)
if can_simplify:
to_delete.add(j)
if to_delete:
changed = True
for i in reversed(sorted(to_delete)):
del ast.operands[i]
return ast, changed
class DNFTransformer(ObservationExpressionTransformer):
"""
Transform an observation expression to DNF. This will distribute AND and
FOLLOWEDBY over OR:
A and (B or C) => (A and B) or (A and C)
A followedby (B or C) => (A followedby B) or (A followedby C)
"""
def __transform(self, ast):
root_type = type(ast) # will be AST class for AND or FOLLOWEDBY
changed = False
or_children = []
other_children = []
for child in ast.operands:
if isinstance(child, OrObservationExpression):
or_children.append(child.operands)
else:
other_children.append(child)
if or_children:
distributed_children = [
root_type([
_dupe_ast(sub_ast) for sub_ast in itertools.chain(
other_children, prod_seq,
)
])
for prod_seq in itertools.product(*or_children)
]
# Need to recursively continue to distribute AND/FOLLOWEDBY over OR
# in any of our new sub-expressions which need it.
distributed_children = [
self.transform(child)[0] for child in distributed_children
]
result = OrObservationExpression(distributed_children)
changed = True
else:
result = ast
return result, changed
def transform_and(self, ast):
return self.__transform(ast)
def transform_followedby(self, ast):
return self.__transform(ast)
class CanonicalizeComparisonExpressionsTransformer(
ObservationExpressionTransformer
):
"""
Canonicalize all comparison expressions.
"""
def __init__(self):
comp_flatten = CFlattenTransformer()
comp_order = COrderDedupeTransformer()
comp_absorb = CAbsorptionTransformer()
simplify = ChainTransformer(comp_flatten, comp_order, comp_absorb)
settle_simplify = SettleTransformer(simplify)
comp_special = SpecialValueCanonicalization()
comp_dnf = CDNFTransformer()
self.__comp_canonicalize = ChainTransformer(
comp_special, settle_simplify, comp_dnf, settle_simplify,
)
def transform_observation(self, ast):
comp_expr = ast.operand
canon_comp_expr, changed = self.__comp_canonicalize.transform(comp_expr)
ast.operand = canon_comp_expr
return ast, changed

View File

@ -0,0 +1,227 @@
"""
Some simple comparison expression canonicalization functions.
"""
import socket
from stix2.equivalence.patterns.compare.comparison import (
object_path_to_raw_values,
)
# Values we can use as wildcards in path patterns
_ANY_IDX = object()
_ANY_KEY = object()
_ANY = object()
def _path_is(object_path, path_pattern):
"""
Compare an object path against a pattern. This enables simple path
recognition based on a pattern, which is slightly more flexible than exact
equality: it supports some simple wildcards.
The path pattern must be an iterable of values: strings for key path steps,
ints or "*" for index path steps, or wildcards. Exact matches are required
for non-wildcards in the pattern. For the wildcards, _ANY_IDX matches any
index path step; _ANY_KEY matches any key path step, and _ANY matches any
path step.
:param object_path: An ObjectPath instance
:param path_pattern: An iterable giving the pattern path steps
:return: True if the path matches the pattern; False if not
"""
path_values = object_path_to_raw_values(object_path)
path_iter = iter(path_values)
patt_iter = iter(path_pattern)
result = True
while True:
path_val = next(path_iter, None)
patt_val = next(patt_iter, None)
if path_val is None and patt_val is None:
# equal length sequences; no differences found
break
elif path_val is None or patt_val is None:
# unequal length sequences
result = False
break
elif patt_val is _ANY_IDX:
if not isinstance(path_val, int) and path_val != "*":
result = False
break
elif patt_val is _ANY_KEY:
if not isinstance(path_val, str):
result = False
break
elif patt_val is not _ANY and patt_val != path_val:
result = False
break
return result
def _mask_bytes(ip_bytes, prefix_size):
"""
Retain the high-order 'prefix_size' bits from ip_bytes, and zero out the
remaining low-order bits. This side-effects ip_bytes.
:param ip_bytes: A mutable byte sequence (e.g. a bytearray)
:param prefix_size: An integer prefix size
"""
addr_size_bytes = len(ip_bytes)
addr_size_bits = 8 * addr_size_bytes
assert 0 <= prefix_size <= addr_size_bits
num_fixed_bytes = prefix_size // 8
num_zero_bytes = (addr_size_bits - prefix_size) // 8
if num_zero_bytes > 0:
ip_bytes[addr_size_bytes - num_zero_bytes:] = b"\x00" * num_zero_bytes
if num_fixed_bytes + num_zero_bytes != addr_size_bytes:
# The address boundary doesn't fall on a byte boundary.
# So we have a byte for which we have to zero out some
# bits.
num_1_bits = prefix_size % 8
mask = ((1 << num_1_bits) - 1) << (8 - num_1_bits)
ip_bytes[num_fixed_bytes] &= mask
def windows_reg_key(comp_expr):
"""
Lower-cases the rhs, depending on the windows-registry-key property
being compared. This enables case-insensitive comparisons between two
patterns, for those values. This side-effects the given AST.
:param comp_expr: A _ComparisonExpression object whose type is
windows-registry-key
"""
if _path_is(comp_expr.lhs, ("key",)) \
or _path_is(comp_expr.lhs, ("values", _ANY_IDX, "name")):
comp_expr.rhs.value = comp_expr.rhs.value.lower()
def ipv4_addr(comp_expr):
"""
Canonicalizes a CIDR IPv4 address by zeroing out low-order bits, according
to the prefix size. This affects the rhs when the "value" property of an
ipv4-addr is being compared. If the prefix size is 32, the size suffix is
simply dropped since it's redundant. If the value is not a valid CIDR
address, then no change is made. This also runs the address through the
platform's IPv4 address processing functions (inet_aton() and inet_ntoa()),
which can adjust the format.
This side-effects the given AST.
:param comp_expr: A _ComparisonExpression object whose type is ipv4-addr.
"""
if _path_is(comp_expr.lhs, ("value",)):
value = comp_expr.rhs.value
slash_idx = value.find("/")
is_cidr = slash_idx >= 0
if is_cidr:
ip_str = value[:slash_idx]
else:
ip_str = value
try:
ip_bytes = socket.inet_aton(ip_str)
except OSError:
# illegal IPv4 address string
return
if is_cidr:
try:
prefix_size = int(value[slash_idx+1:])
except ValueError:
# illegal prefix size
return
if prefix_size < 0 or prefix_size > 32:
# illegal prefix size
return
if not is_cidr or prefix_size == 32:
# If a CIDR with prefix size 32, drop the prefix size since it's
# redundant. Run the address bytes through inet_ntoa() in case it
# would adjust the format (e.g. drop leading zeros:
# 1.2.3.004 => 1.2.3.4).
value = socket.inet_ntoa(ip_bytes)
else:
# inet_aton() gives an immutable 'bytes' value; we need a value
# we can change.
ip_bytes = bytearray(ip_bytes)
_mask_bytes(ip_bytes, prefix_size)
ip_str = socket.inet_ntoa(ip_bytes)
value = ip_str + "/" + str(prefix_size)
comp_expr.rhs.value = value
def ipv6_addr(comp_expr):
"""
Canonicalizes a CIDR IPv6 address by zeroing out low-order bits, according
to the prefix size. This affects the rhs when the "value" property of an
ipv6-addr is being compared. If the prefix size is 128, the size suffix is
simply dropped since it's redundant. If the value is not a valid CIDR
address, then no change is made. This also runs the address through the
platform's IPv6 address processing functions (inet_pton() and inet_ntop()),
which can adjust the format.
This side-effects the given AST.
:param comp_expr: A _ComparisonExpression object whose type is ipv6-addr.
"""
if _path_is(comp_expr.lhs, ("value",)):
value = comp_expr.rhs.value
slash_idx = value.find("/")
is_cidr = slash_idx >= 0
if is_cidr:
ip_str = value[:slash_idx]
else:
ip_str = value
try:
ip_bytes = socket.inet_pton(socket.AF_INET6, ip_str)
except OSError:
# illegal IPv6 address string
return
if is_cidr:
try:
prefix_size = int(value[slash_idx+1:])
except ValueError:
# illegal prefix size
return
if prefix_size < 0 or prefix_size > 128:
# illegal prefix size
return
if not is_cidr or prefix_size == 128:
# If a CIDR with prefix size 128, drop the prefix size since it's
# redundant. Run the IP address through inet_ntop() so it can
# reformat with the double-colons (and make any other adjustments)
# if necessary.
value = socket.inet_ntop(socket.AF_INET6, ip_bytes)
else:
# inet_pton() gives an immutable 'bytes' value; we need a value
# we can change.
ip_bytes = bytearray(ip_bytes)
_mask_bytes(ip_bytes, prefix_size)
ip_str = socket.inet_ntop(socket.AF_INET6, ip_bytes)
value = ip_str + "/" + str(prefix_size)
comp_expr.rhs.value = value

View File

@ -2,8 +2,8 @@
import importlib
import inspect
from six import text_type
from six import text_type
from stix2patterns.exceptions import ParseException
from stix2patterns.grammars.STIXPatternParser import TerminalNode
from stix2patterns.v20.grammars.STIXPatternParser import \
@ -261,9 +261,11 @@ class STIXPatternVisitorForSTIX2():
property_path.append(self.instantiate("ListObjectPathComponent", current.property_name, next.getText()))
i += 2
elif isinstance(next, IntegerConstant):
property_path.append(self.instantiate("ListObjectPathComponent",
current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current),
next.value))
property_path.append(self.instantiate(
"ListObjectPathComponent",
current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current),
next.value,
))
i += 2
else:
property_path.append(current)

View File

@ -0,0 +1,634 @@
import pytest
from stix2.equivalence.patterns import (
equivalent_patterns, find_equivalent_patterns,
)
# # # #
# # Observation expression equivalence tests # #
# # # #
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1] OR [a:b=1]",
"[a:b=1]",
),
(
"[a:b=1] OR [a:b=1] OR [a:b=1]",
"[a:b=1]",
),
],
)
def test_obs_dupe_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1] AND [a:b=1]",
"[a:b=1]",
),
(
"[a:b=1] FOLLOWEDBY [a:b=1]",
"[a:b=1]",
),
],
)
def test_obs_dupe_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
("[a:b=1]", "([a:b=1])"),
("(((([a:b=1]))))", "([a:b=1])"),
(
"[a:b=1] AND ([a:b=2] AND [a:b=3])",
"[a:b=1] AND [a:b=2] AND [a:b=3]",
),
(
"([a:b=1] AND [a:b=2]) AND [a:b=3]",
"[a:b=1] AND ([a:b=2] AND [a:b=3])",
),
(
"[a:b=1] OR ([a:b=2] OR [a:b=3])",
"[a:b=1] OR [a:b=2] OR [a:b=3]",
),
(
"([a:b=1] OR [a:b=2]) OR [a:b=3]",
"[a:b=1] OR ([a:b=2] OR [a:b=3])",
),
(
"[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY [a:b=3])",
"[a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]",
),
(
"([a:b=1] FOLLOWEDBY [a:b=2]) FOLLOWEDBY [a:b=3]",
"[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY [a:b=3])",
),
(
"[a:b=1] AND ([a:b=2] AND ([a:b=3] AND [a:b=4])) AND ([a:b=5])",
"([a:b=1] AND ([a:b=2] AND [a:b=3]) AND ([a:b=4] AND [a:b=5]))",
),
],
)
def test_obs_flatten_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"([a:b=1] AND [a:b=2]) OR [a:b=3]",
"[a:b=1] AND ([a:b=2] OR [a:b=3])",
),
(
"([a:b=1] OR [a:b=2]) FOLLOWEDBY [a:b=3]",
"[a:b=1] OR ([a:b=2] FOLLOWEDBY [a:b=3])",
),
("[a:b=1]", "([a:b=1]) REPEATS 2 TIMES"),
("(((([a:b=1]))))", "([a:b=1] REPEATS 2 TIMES)"),
(
"[a:b=1] AND ([a:b=2] AND [a:b=3]) WITHIN 2 SECONDS",
"[a:b=1] WITHIN 2 SECONDS AND [a:b=2] AND [a:b=3]",
),
(
"[a:b=1] OR ([a:b=2] OR [a:b=3]) WITHIN 2 SECONDS",
"[a:b=1] WITHIN 2 SECONDS OR [a:b=2] OR [a:b=3]",
),
(
"[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY [a:b=3]) WITHIN 2 SECONDS",
"[a:b=1] WITHIN 2 SECONDS FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]",
),
],
)
def test_obs_flatten_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1] AND [a:b=2]",
"[a:b=2] AND [a:b=1]",
),
(
"[a:b=1] OR [a:b=2]",
"[a:b=2] OR [a:b=1]",
),
(
"[a:b=1] OR ([a:b=2] AND [a:b=3])",
"([a:b=3] AND [a:b=2]) OR [a:b=1]",
),
(
"[a:b=1] WITHIN 2 SECONDS AND [a:b=2] REPEATS 2 TIMES",
"[a:b=2] REPEATS 2 TIMES AND [a:b=1] WITHIN 2 SECONDS",
),
],
)
def test_obs_order_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1] FOLLOWEDBY [a:b=2]",
"[a:b=2] FOLLOWEDBY [a:b=1]",
),
(
"[a:b=1] WITHIN 2 SECONDS AND [a:b=2] REPEATS 2 TIMES",
"[a:b=1] REPEATS 2 TIMES AND [a:b=2] WITHIN 2 SECONDS",
),
],
)
def test_obs_order_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1] OR ([a:b=1] AND [a:b=2])",
"[a:b=1]",
),
(
"[a:b=1] OR ([a:b=1] FOLLOWEDBY [a:b=2])",
"[a:b=1]",
),
(
"([a:b=3] AND [a:b=1]) OR ([a:b=1] AND [a:b=2] AND [a:b=3])",
"[a:b=3] AND [a:b=1]",
),
(
"([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=4] FOLLOWEDBY [a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3])",
"[a:b=1] FOLLOWEDBY [a:b=3]",
),
(
"([a:b=1] FOLLOWEDBY [a:b=2]) OR (([a:b=1] FOLLOWEDBY [a:b=2]) AND [a:b=3])",
"[a:b=1] FOLLOWEDBY [a:b=2]",
),
(
"([a:b=1] AND [a:b=2]) OR (([a:b=1] AND [a:b=2]) FOLLOWEDBY [a:b=3])",
"[a:b=1] AND [a:b=2]",
),
],
)
def test_obs_absorb_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"([a:b=1] AND [a:b=2]) OR ([a:b=2] AND [a:b=3] AND [a:b=4])",
"[a:b=1] AND [a:b=2]",
),
(
"([a:b=2] FOLLOWEDBY [a:b=1]) OR ([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3])",
"[a:b=2] FOLLOWEDBY [a:b=1]",
),
],
)
def test_obs_absorb_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1] AND ([a:b=2] OR [a:b=3])",
"([a:b=1] AND [a:b=2]) OR ([a:b=1] AND [a:b=3])",
),
(
"[a:b=1] FOLLOWEDBY ([a:b=2] OR [a:b=3])",
"([a:b=1] FOLLOWEDBY [a:b=2]) OR ([a:b=1] FOLLOWEDBY [a:b=3])",
),
(
"[a:b=1] AND ([a:b=2] AND ([a:b=3] OR [a:b=4]))",
"([a:b=1] AND [a:b=2] AND [a:b=3]) OR ([a:b=1] AND [a:b=2] AND [a:b=4])",
),
(
"[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY ([a:b=3] OR [a:b=4]))",
"([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=4])",
),
(
"([a:b=1] OR [a:b=2]) AND ([a:b=3] OR [a:b=4])",
"([a:b=1] AND [a:b=3]) OR ([a:b=1] AND [a:b=4]) OR ([a:b=2] AND [a:b=3]) OR ([a:b=2] AND [a:b=4])",
),
(
"([a:b=1] OR [a:b=2]) FOLLOWEDBY ([a:b=3] OR [a:b=4])",
"([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=4]) OR ([a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=2] FOLLOWEDBY [a:b=4])",
),
],
)
def test_obs_dnf_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1] AND [a:b=2]",
"[a:b=1] OR [a:b=2]",
),
(
"[a:b=1] AND ([a:b=2] OR [a:b=3])",
"([a:b=1] AND [a:b=2]) OR [a:b=3]",
),
(
"[a:b=1] WITHIN 2 SECONDS",
"[a:b=1] REPEATS 2 TIMES",
),
],
)
def test_obs_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2)
# # # #
# # Comparison expression equivalence tests # #
# # # #
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1 AND a:b=1]",
"[a:b=1]",
),
(
"[a:b=1 AND a:b=1 AND a:b=1]",
"[a:b=1]",
),
(
"[a:b=1 OR a:b=1]",
"[a:b=1]",
),
(
"[a:b=1 OR a:b=1 OR a:b=1]",
"[a:b=1]",
),
],
)
def test_comp_dupe_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[(a:b=1)]",
"[a:b=1]",
),
(
"[(((((a:b=1)))))]",
"[(a:b=1)]",
),
(
"[a:b=1 AND (a:b=2 AND a:b=3)]",
"[(a:b=1 AND a:b=2) AND a:b=3]",
),
(
"[a:b=1 OR (a:b=2 OR a:b=3)]",
"[(a:b=1 OR a:b=2) OR a:b=3]",
),
(
"[(((a:b=1 AND ((a:b=2) AND a:b=3) AND (a:b=4))))]",
"[a:b=1 AND a:b=2 AND a:b=3 AND a:b=4]",
),
(
"[(((a:b=1 OR ((a:b=2) OR a:b=3) OR (a:b=4))))]",
"[a:b=1 OR a:b=2 OR a:b=3 OR a:b=4]",
),
],
)
def test_comp_flatten_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1 AND a:b=2]",
"[a:b=2 AND a:b=1]",
),
(
"[a:b=1 OR a:b=2]",
"[a:b=2 OR a:b=1]",
),
(
"[(a:b=1 OR a:b=2) AND a:b=3]",
"[a:b=3 AND (a:b=2 OR a:b=1)]",
),
],
)
def test_comp_order_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1 OR (a:b=1 AND a:b=2)]",
"[a:b=1]",
),
(
"[a:b=1 AND (a:b=1 OR a:b=2)]",
"[a:b=1]",
),
(
"[(a:b=1 AND a:b=2) OR (a:b=3 AND a:b=2 AND a:b=1)]",
"[a:b=1 AND a:b=2]",
),
(
"[(a:b=1 OR a:b=2) AND (a:b=3 OR a:b=2 OR a:b=1)]",
"[a:b=1 OR a:b=2]",
),
],
)
def test_comp_absorb_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1 OR (a:b=2 AND a:b=3)]",
"[(a:b=1 OR a:b=2) AND (a:b=1 OR a:b=3)]",
),
(
"[a:b=1 AND (a:b=2 OR a:b=3)]",
"[(a:b=1 AND a:b=2) OR (a:b=1 AND a:b=3)]",
),
(
"[(a:b=1 AND a:b=2) OR (a:b=3 AND a:b=4)]",
"[(a:b=1 OR a:b=3) AND (a:b=1 OR a:b=4) AND (a:b=2 OR a:b=3) AND (a:b=2 OR a:b=4)]",
),
(
"[(a:b=1 OR a:b=2) AND (a:b=3 OR a:b=4)]",
"[(a:b=1 AND a:b=3) OR (a:b=1 AND a:b=4) OR (a:b=2 AND a:b=3) OR (a:b=2 AND a:b=4)]",
),
(
"[a:b=1 AND (a:b=2 AND (a:b=3 OR a:b=4))]",
"[(a:b=1 AND a:b=2 AND a:b=3) OR (a:b=1 AND a:b=2 AND a:b=4)]",
),
],
)
def test_comp_dnf_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1]",
"[a:b=2]",
),
(
"[a:b=1 AND a:b=2]",
"[a:b=1 OR a:b=2]",
),
(
"[(a:b=1 AND a:b=2) OR a:b=3]",
"[a:b=1 AND (a:b=2 OR a:b=3)]",
),
],
)
def test_comp_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[ipv4-addr:value='1.2.3.4/32']",
"[ipv4-addr:value='1.2.3.4']",
),
(
"[ipv4-addr:value='1.2.3.4/24']",
"[ipv4-addr:value='1.2.3.0/24']",
),
(
"[ipv4-addr:value='1.2.255.4/23']",
"[ipv4-addr:value='1.2.254.0/23']",
),
(
"[ipv4-addr:value='1.2.255.4/20']",
"[ipv4-addr:value='1.2.240.0/20']",
),
(
"[ipv4-addr:value='1.2.255.4/0']",
"[ipv4-addr:value='0.0.0.0/0']",
),
(
"[ipv4-addr:value='01.02.03.04']",
"[ipv4-addr:value='1.2.3.4']",
),
(
"[ipv4-addr:value='1.2.3.4/-5']",
"[ipv4-addr:value='1.2.3.4/-5']",
),
(
"[ipv4-addr:value='1.2.3.4/99']",
"[ipv4-addr:value='1.2.3.4/99']",
),
(
"[ipv4-addr:value='foo']",
"[ipv4-addr:value='foo']",
),
],
)
def test_comp_special_canonicalization_ipv4(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[ipv4-addr:value='1.2.3.4']",
"[ipv4-addr:value='1.2.3.5']",
),
(
"[ipv4-addr:value='1.2.3.4/1']",
"[ipv4-addr:value='1.2.3.4/2']",
),
(
"[ipv4-addr:value='foo']",
"[ipv4-addr:value='bar']",
),
],
)
def test_comp_special_canonicalization_ipv4_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[ipv6-addr:value='1:2:3:4:5:6:7:8/128']",
"[ipv6-addr:value='1:2:3:4:5:6:7:8']",
),
(
"[ipv6-addr:value='1:2:3:4:5:6:7:8/112']",
"[ipv6-addr:value='1:2:3:4:5:6:7:0/112']",
),
(
"[ipv6-addr:value='1:2:3:4:5:6:ffff:8/111']",
"[ipv6-addr:value='1:2:3:4:5:6:fffe:0/111']",
),
(
"[ipv6-addr:value='1:2:3:4:5:6:ffff:8/104']",
"[ipv6-addr:value='1:2:3:4:5:6:ff00:0/104']",
),
(
"[ipv6-addr:value='1:2:3:4:5:6:7:8/0']",
"[ipv6-addr:value='0:0:0:0:0:0:0:0/0']",
),
(
"[ipv6-addr:value='0001:0000:0000:0000:0000:0000:0000:0001']",
"[ipv6-addr:value='1::1']",
),
(
"[ipv6-addr:value='0000:0000:0000:0000:0000:0000:0000:0000']",
"[ipv6-addr:value='::']",
),
(
"[ipv6-addr:value='1:2:3:4:5:6:7:8/-5']",
"[ipv6-addr:value='1:2:3:4:5:6:7:8/-5']",
),
(
"[ipv6-addr:value='1:2:3:4:5:6:7:8/99']",
"[ipv6-addr:value='1:2:3:4:5:6:7:8/99']",
),
(
"[ipv6-addr:value='foo']",
"[ipv6-addr:value='foo']",
),
],
)
def test_comp_special_canonicalization_ipv6(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[ipv6-addr:value='1:2:3:4:5:6:7:8']",
"[ipv6-addr:value='1:2:3:4:5:6:7:9']",
),
(
"[ipv6-addr:value='1:2:3:4:5:6:7:8/1']",
"[ipv6-addr:value='1:2:3:4:5:6:7:8/2']",
),
(
"[ipv6-addr:value='foo']",
"[ipv6-addr:value='bar']",
),
],
)
def test_comp_special_canonicalization_ipv6_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[windows-registry-key:key = 'aaa']",
"[windows-registry-key:key = 'AAA']",
),
(
"[windows-registry-key:values[0].name = 'aaa']",
"[windows-registry-key:values[0].name = 'AAA']",
),
(
"[windows-registry-key:values[*].name = 'aaa']",
"[windows-registry-key:values[*].name = 'AAA']",
),
],
)
def test_comp_special_canonicalization_win_reg_key(patt1, patt2):
assert equivalent_patterns(patt1, patt2)
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[windows-registry-key:key='foo']",
"[windows-registry-key:key='bar']",
),
(
"[windows-registry-key:values[0].name='foo']",
"[windows-registry-key:values[0].name='bar']",
),
(
"[windows-registry-key:values[*].name='foo']",
"[windows-registry-key:values[*].name='bar']",
),
(
"[windows-registry-key:values[*].data='foo']",
"[windows-registry-key:values[*].data='FOO']",
),
],
)
def test_comp_special_canonicalization_win_reg_key_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2)
def test_comp_other_constant_types():
constants = [
"1.23",
"1",
"true",
"false",
"h'4fa2'",
"b'ZmpoZWll'",
"t'1982-12-31T02:14:17.232Z'",
]
pattern_template = "[a:b={}]"
for i, const1 in enumerate(constants):
for j, const2 in enumerate(constants):
patt1 = pattern_template.format(const1)
patt2 = pattern_template.format(const2)
if i == j:
assert equivalent_patterns(patt1, patt2)
else:
assert not equivalent_patterns(patt1, patt2)
# can't use an "=" pattern with lists...
for const in constants:
patt1 = "[a:b={}]".format(const)
patt2 = "[a:b IN (1,2,3)]"
assert not equivalent_patterns(patt1, patt2)
# # # #
# # find_equivalent_patterns() tests # #
# # # #
def test_find_equivalent_patterns():
search_pattern = "[a:b=1]"
other_patterns = [
"[a:b=2]",
"[a:b=1]",
"[a:b=1] WITHIN 1 SECONDS",
"[a:b=1] OR ([a:b=2] AND [a:b=1])",
"[(a:b=2 OR a:b=1) AND a:b=1]",
"[c:d=1]",
"[a:b>1]",
]
result = list(
find_equivalent_patterns(search_pattern, other_patterns),
)
assert result == [
"[a:b=1]",
"[a:b=1] OR ([a:b=2] AND [a:b=1])",
"[(a:b=2 OR a:b=1) AND a:b=1]",
]

View File

@ -0,0 +1,47 @@
"""
Pattern equivalence unit tests which use STIX 2.0-specific pattern features
"""
import pytest
from stix2.equivalence.patterns import equivalent_patterns
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'",
"[a:b=1 OR (a:c=2 AND a:b=1)] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'",
),
(
"[a:b=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z' WITHIN 2 SECONDS",
"[a:b=1 OR (a:c=2 AND a:b=1)] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z' WITHIN 2 SECONDS",
),
(
"[a:b=1] REPEATS 2 TIMES REPEATS 2 TIMES",
"([a:b=1] REPEATS 2 TIMES) REPEATS 2 TIMES",
),
],
)
def test_startstop_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2, stix_version="2.0")
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b!=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'",
"[a:b!=1] START '1977-09-29T07:41:03Z' STOP '1996-09-18T22:46:07Z'",
),
(
"[a:b<1] REPEATS 2 TIMES START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'",
"[a:b<1] REPEATS 2 TIMES START '1977-09-29T07:41:03Z' STOP '1996-09-18T22:46:07Z'",
),
(
"[a:b=1] REPEATS 2 TIMES REPEATS 2 TIMES",
"([a:b=1] REPEATS 2 TIMES) REPEATS 3 TIMES",
),
],
)
def test_startstop_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2, stix_version="2.0")

View File

@ -0,0 +1,47 @@
"""
Pattern equivalence unit tests which use STIX 2.1+-specific pattern features
"""
import pytest
from stix2.equivalence.patterns import equivalent_patterns
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'",
"[a:b=1 OR (a:c=2 AND a:b=1)] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'",
),
(
"[a:b=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z' WITHIN 2 SECONDS",
"[a:b=1 OR (a:c=2 AND a:b=1)] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z' WITHIN 2 SECONDS",
),
(
"([a:b=1]) REPEATS 2 TIMES REPEATS 2 TIMES",
"([a:b=1] REPEATS 2 TIMES) REPEATS 2 TIMES",
),
],
)
def test_startstop_equivalent(patt1, patt2):
assert equivalent_patterns(patt1, patt2, stix_version="2.1")
@pytest.mark.parametrize(
"patt1, patt2", [
(
"[a:b!=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'",
"[a:b!=1] START t'1977-09-29T07:41:03Z' STOP t'1996-09-18T22:46:07Z'",
),
(
"[a:b<1] REPEATS 2 TIMES START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'",
"[a:b<1] REPEATS 2 TIMES START t'1977-09-29T07:41:03Z' STOP t'1996-09-18T22:46:07Z'",
),
(
"([a:b=1]) REPEATS 2 TIMES REPEATS 2 TIMES",
"([a:b=1] REPEATS 2 TIMES) REPEATS 3 TIMES",
),
],
)
def test_startstop_not_equivalent(patt1, patt2):
assert not equivalent_patterns(patt1, patt2, stix_version="2.1")

View File

@ -658,6 +658,7 @@ def test_parsing_integer_index():
patt_obj = create_pattern_object("[a:b[1]=2]")
assert str(patt_obj) == "[a:b[1] = 2]"
# This should never occur, because the first component will always be a property_name, and they should not be quoted.
def test_parsing_quoted_first_path_component():
patt_obj = create_pattern_object("[a:'b'[1]=2]")