From 311fe38ceaddd8da333192957879843d88c11b8e Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Mon, 10 Aug 2020 18:33:26 -0400 Subject: [PATCH 01/10] Add first cut of a pattern equivalence capability --- stix2/equivalence/__init__.py | 0 stix2/equivalence/patterns/__init__.py | 72 +++ .../equivalence/patterns/compare/__init__.py | 90 ++++ .../patterns/compare/comparison.py | 351 +++++++++++++ .../patterns/compare/observation.py | 124 +++++ .../patterns/transform/__init__.py | 56 ++ .../patterns/transform/comparison.py | 331 ++++++++++++ .../patterns/transform/observation.py | 486 ++++++++++++++++++ 8 files changed, 1510 insertions(+) create mode 100644 stix2/equivalence/__init__.py create mode 100644 stix2/equivalence/patterns/__init__.py create mode 100644 stix2/equivalence/patterns/compare/__init__.py create mode 100644 stix2/equivalence/patterns/compare/comparison.py create mode 100644 stix2/equivalence/patterns/compare/observation.py create mode 100644 stix2/equivalence/patterns/transform/__init__.py create mode 100644 stix2/equivalence/patterns/transform/comparison.py create mode 100644 stix2/equivalence/patterns/transform/observation.py diff --git a/stix2/equivalence/__init__.py b/stix2/equivalence/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stix2/equivalence/patterns/__init__.py b/stix2/equivalence/patterns/__init__.py new file mode 100644 index 0000000..9965c35 --- /dev/null +++ b/stix2/equivalence/patterns/__init__.py @@ -0,0 +1,72 @@ +import stix2.pattern_visitor +from stix2.equivalence.patterns.transform import ( + ChainTransformer, SettleTransformer +) +from stix2.equivalence.patterns.compare.observation import ( + observation_expression_cmp +) +from stix2.equivalence.patterns.transform.observation import ( + CanonicalizeComparisonExpressionsTransformer, + AbsorptionTransformer, + FlattenTransformer, + DNFTransformer, + OrderDedupeTransformer +) + + +# Lazy-initialize +_pattern_canonicalizer = None + + +def _get_pattern_canonicalizer(): + """ + Get a canonicalization transformer for STIX patterns. + + :return: The transformer + """ + + # The transformers are either stateless or contain no state which changes + # with each use. So we can setup the transformers once and keep reusing + # them. + global _pattern_canonicalizer + + if not _pattern_canonicalizer: + canonicalize_comp_expr = \ + CanonicalizeComparisonExpressionsTransformer() + + obs_expr_flatten = FlattenTransformer() + obs_expr_order = OrderDedupeTransformer() + obs_expr_absorb = AbsorptionTransformer() + obs_simplify = ChainTransformer( + obs_expr_flatten, obs_expr_order, obs_expr_absorb + ) + obs_settle_simplify = SettleTransformer(obs_simplify) + + obs_dnf = DNFTransformer() + + _pattern_canonicalizer = ChainTransformer( + canonicalize_comp_expr, + obs_settle_simplify, obs_dnf, obs_settle_simplify + ) + + return _pattern_canonicalizer + + +def equivalent_patterns(pattern1, pattern2): + """ + Determine whether two STIX patterns are semantically equivalent. + + :param pattern1: The first STIX pattern + :param pattern2: The second STIX pattern + :return: True if the patterns are semantically equivalent; False if not + """ + patt_ast1 = stix2.pattern_visitor.create_pattern_object(pattern1) + patt_ast2 = stix2.pattern_visitor.create_pattern_object(pattern2) + + pattern_canonicalizer = _get_pattern_canonicalizer() + canon_patt1, _ = pattern_canonicalizer.transform(patt_ast1) + canon_patt2, _ = pattern_canonicalizer.transform(patt_ast2) + + result = observation_expression_cmp(canon_patt1, canon_patt2) + + return result == 0 diff --git a/stix2/equivalence/patterns/compare/__init__.py b/stix2/equivalence/patterns/compare/__init__.py new file mode 100644 index 0000000..a80de4f --- /dev/null +++ b/stix2/equivalence/patterns/compare/__init__.py @@ -0,0 +1,90 @@ +""" +Some generic comparison utility functions. +""" + +def generic_cmp(value1, value2): + """ + Generic comparator of values which uses the builtin '<' and '>' operators. + Assumes the values can be compared that way. + + :param value1: The first value + :param value2: The second value + :return: -1, 0, or 1 depending on whether value1 is less, equal, or greater + than value2 + """ + + return -1 if value1 < value2 else 1 if value1 > value2 else 0 + + +def iter_lex_cmp(seq1, seq2, cmp): + """ + Generic lexicographical compare function, which works on two iterables and + a comparator function. + + :param seq1: The first iterable + :param seq2: The second iterable + :param cmp: a two-arg callable comparator for values iterated over. It + must behave analogously to this function, returning <0, 0, or >0 to + express the ordering of the two values. + :return: <0 if seq1 < seq2; >0 if seq1 > seq2; 0 if they're equal + """ + + it1 = iter(seq1) + it2 = iter(seq2) + + it1_exhausted = it2_exhausted = False + while True: + try: + val1 = next(it1) + except StopIteration: + it1_exhausted = True + + try: + val2 = next(it2) + except StopIteration: + it2_exhausted = True + + # same length, all elements equal + if it1_exhausted and it2_exhausted: + result = 0 + break + + # one is a prefix of the other; the shorter one is less + elif it1_exhausted: + result = -1 + break + + elif it2_exhausted: + result = 1 + break + + # neither is exhausted; check values + else: + val_cmp = cmp(val1, val2) + + if val_cmp != 0: + result = val_cmp + break + + return result + + +def iter_in(value, seq, cmp): + """ + A function behaving like the "in" Python operator, but which works with a + a comparator function. This function checks whether the given value is + contained in the given iterable. + + :param value: A value + :param seq: An iterable + :param cmp: A 2-arg comparator function which must return 0 if the args + are equal + :return: True if the value is found in the iterable, False if it is not + """ + result = False + for seq_val in seq: + if cmp(value, seq_val) == 0: + result = True + break + + return result diff --git a/stix2/equivalence/patterns/compare/comparison.py b/stix2/equivalence/patterns/compare/comparison.py new file mode 100644 index 0000000..03b16f4 --- /dev/null +++ b/stix2/equivalence/patterns/compare/comparison.py @@ -0,0 +1,351 @@ +""" +Comparison utilities for STIX pattern comparison expressions. +""" +import base64 +import functools +from stix2.patterns import ( + _ComparisonExpression, AndBooleanExpression, OrBooleanExpression, + ListObjectPathComponent, IntegerConstant, FloatConstant, StringConstant, + BooleanConstant, TimestampConstant, HexConstant, BinaryConstant, + ListConstant +) +from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp + + +_COMPARISON_OP_ORDER = ( + "=", "!=", "<>", "<", "<=", ">", ">=", + "IN", "LIKE", "MATCHES", "ISSUBSET", "ISSUPERSET" +) + + +_CONSTANT_TYPE_ORDER = ( + # ints/floats come first, but have special handling since the types are + # treated equally as a generic "number" type. So they aren't in this list. + # See constant_cmp(). + StringConstant, BooleanConstant, + TimestampConstant, HexConstant, BinaryConstant, ListConstant +) + + +def generic_constant_cmp(const1, const2): + """ + Generic comparator for most _Constant instances. They must have a "value" + attribute whose value supports the builtin comparison operators. + + :param const1: The first _Constant instance + :param const2: The second _Constant instance + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + return generic_cmp(const1.value, const2.value) + + +def bool_cmp(value1, value2): + """ + Compare two boolean constants. + + :param value1: The first BooleanConstant instance + :param value2: The second BooleanConstant instance + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + + # unwrap from _Constant instances + value1 = value1.value + value2 = value2.value + + if (value1 and value2) or (not value1 and not value2): + result = 0 + + # Let's say... True < False? + elif value1: + result = -1 + + else: + result = 1 + + return result + + +def hex_cmp(value1, value2): + """ + Compare two STIX "hex" values. This decodes to bytes and compares that. + It does *not* do a string compare on the hex representations. + + :param value1: The first HexConstant + :param value2: The second HexConstant + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + bytes1 = bytes.fromhex(value1.value) + bytes2 = bytes.fromhex(value2.value) + + return generic_cmp(bytes1, bytes2) + + +def bin_cmp(value1, value2): + """ + Compare two STIX "binary" values. This decodes to bytes and compares that. + It does *not* do a string compare on the base64 representations. + + :param value1: The first BinaryConstant + :param value2: The second BinaryConstant + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + bytes1 = base64.standard_b64decode(value1.value) + bytes2 = base64.standard_b64decode(value2.value) + + return generic_cmp(bytes1, bytes2) + + +def list_cmp(value1, value2): + """ + Compare lists order-insensitively. + + :param value1: The first ListConstant + :param value2: The second ListConstant + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + + # Achieve order-independence by sorting the lists first. + sorted_value1 = sorted( + value1.value, key=functools.cmp_to_key(constant_cmp) + ) + + sorted_value2 = sorted( + value2.value, key=functools.cmp_to_key(constant_cmp) + ) + + result = iter_lex_cmp(sorted_value1, sorted_value2, constant_cmp) + + return result + + +_CONSTANT_COMPARATORS = { + # We have special handling for ints/floats, so no entries for those AST + # classes here. See constant_cmp(). + StringConstant: generic_constant_cmp, + BooleanConstant: bool_cmp, + TimestampConstant: generic_constant_cmp, + HexConstant: hex_cmp, + BinaryConstant: bin_cmp, + ListConstant: list_cmp +} + + +def object_path_component_cmp(comp1, comp2): + """ + Compare a string/int to another string/int; this induces an ordering over + all strings and ints. It is used to perform a lexicographical sort on + object paths. + + Ints and strings compare as usual to each other; ints compare less than + strings. + + :param comp1: An object path component (string or int) + :param comp2: An object path component (string or int) + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + + # both ints or both strings: use builtin comparison operators + if (isinstance(comp1, int) and isinstance(comp2, int)) \ + or (isinstance(comp1, str) and isinstance(comp2, str)): + result = generic_cmp(comp1, comp2) + + # one is int, one is string. Let's say ints come before strings. + elif isinstance(comp1, int): + result = -1 + + else: + result = 1 + + return result + + +def object_path_to_raw_values(path): + """ + Converts the given ObjectPath instance to a list of strings and ints. + All property names become strings, regardless of whether they're *_ref + properties; "*" index steps become that string; and numeric index steps + become integers. + + :param path: An ObjectPath instance + :return: A generator iterator over the values + """ + + for comp in path.property_path: + if isinstance(comp, ListObjectPathComponent): + yield comp.property_name + + if comp.index == "*" or isinstance(comp.index, int): + yield comp.index + else: + # in case the index is a stringified int; convert to an actual + # int + yield int(comp.index) + + else: + yield comp.property_name + + +def object_path_cmp(path1, path2): + """ + Compare two object paths. + + :param path1: The first ObjectPath instance + :param path2: The second ObjectPath instance + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + if path1.object_type_name < path2.object_type_name: + result = -1 + + elif path1.object_type_name > path2.object_type_name: + result = 1 + + else: + # I always thought of key and index path steps as separate. The AST + # lumps indices in with the previous key as a single path component. + # The following splits the path components into individual comparable + # values again. Maybe I should not do this... + path_vals1 = object_path_to_raw_values(path1) + path_vals2 = object_path_to_raw_values(path2) + result = iter_lex_cmp( + path_vals1, path_vals2, object_path_component_cmp + ) + + return result + + +def comparison_operator_cmp(op1, op2): + """ + Compare two comparison operators. + + :param op1: The first comparison operator (a string) + :param op2: The second comparison operator (a string) + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + op1_idx = _COMPARISON_OP_ORDER.index(op1) + op2_idx = _COMPARISON_OP_ORDER.index(op2) + + result = generic_cmp(op1_idx, op2_idx) + + return result + + +def constant_cmp(value1, value2): + """ + Compare two constants. + + :param value1: The first _Constant instance + :param value2: The second _Constant instance + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + + # Special handling for ints/floats: treat them generically as numbers, + # ordered before all other types. + if isinstance(value1, (IntegerConstant, FloatConstant)) \ + and isinstance(value2, (IntegerConstant, FloatConstant)): + result = generic_constant_cmp(value1, value2) + + elif isinstance(value1, (IntegerConstant, FloatConstant)): + result = -1 + + elif isinstance(value2, (IntegerConstant, FloatConstant)): + result = 1 + + else: + + type1 = type(value1) + type2 = type(value2) + + type1_idx = _CONSTANT_TYPE_ORDER.index(type1) + type2_idx = _CONSTANT_TYPE_ORDER.index(type2) + + result = generic_cmp(type1_idx, type2_idx) + if result == 0: + # Types are the same; must compare values + cmp_func = _CONSTANT_COMPARATORS.get(type1) + if not cmp_func: + raise TypeError("Don't know how to compare " + type1.__name__) + + result = cmp_func(value1, value2) + + return result + + +def simple_comparison_expression_cmp(expr1, expr2): + """ + Compare "simple" comparison expressions: those which aren't AND/OR + combinations, just comparisons. + + :param expr1: first _ComparisonExpression instance + :param expr2: second _ComparisonExpression instance + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + + result = object_path_cmp(expr1.lhs, expr2.lhs) + + if result == 0: + result = comparison_operator_cmp(expr1.operator, expr2.operator) + + if result == 0: + # _ComparisonExpression's have a "negated" attribute. Umm... + # non-negated < negated? + if not expr1.negated and expr2.negated: + result = -1 + elif expr1.negated and not expr2.negated: + result = 1 + + if result == 0: + result = constant_cmp(expr1.rhs, expr2.rhs) + + return result + + +def comparison_expression_cmp(expr1, expr2): + """ + Compare two comparison expressions. This is sensitive to the order of the + expressions' sub-components. To achieve an order-insensitive comparison, + the ASTs must be canonically ordered first. + + :param expr1: The first comparison expression + :param expr2: The second comparison expression + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + if isinstance(expr1, _ComparisonExpression) \ + and isinstance(expr2, _ComparisonExpression): + result = simple_comparison_expression_cmp(expr1, expr2) + + # One is simple, one is compound. Let's say... simple ones come first? + elif isinstance(expr1, _ComparisonExpression): + result = -1 + + elif isinstance(expr2, _ComparisonExpression): + result = 1 + + # Both are compound: AND's before OR's? + elif isinstance(expr1, AndBooleanExpression) \ + and isinstance(expr2, OrBooleanExpression): + result = -1 + + elif isinstance(expr1, OrBooleanExpression) \ + and isinstance(expr2, AndBooleanExpression): + result = 1 + + else: + # Both compound, same boolean operator: sort according to contents. + # This will order according to recursive invocations of this comparator, + # on sub-expressions. + result = iter_lex_cmp( + expr1.operands, expr2.operands, comparison_expression_cmp + ) + + return result diff --git a/stix2/equivalence/patterns/compare/observation.py b/stix2/equivalence/patterns/compare/observation.py new file mode 100644 index 0000000..66513da --- /dev/null +++ b/stix2/equivalence/patterns/compare/observation.py @@ -0,0 +1,124 @@ +""" +Comparison utilities for STIX pattern observation expressions. +""" +from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp +from stix2.equivalence.patterns.compare.comparison import ( + comparison_expression_cmp, generic_constant_cmp +) +from stix2.patterns import ( + ObservationExpression, AndObservationExpression, OrObservationExpression, + QualifiedObservationExpression, _CompoundObservationExpression, + RepeatQualifier, WithinQualifier, StartStopQualifier, + FollowedByObservationExpression +) + + +_OBSERVATION_EXPRESSION_TYPE_ORDER = ( + ObservationExpression, AndObservationExpression, OrObservationExpression, + FollowedByObservationExpression, QualifiedObservationExpression +) + + +_QUALIFIER_TYPE_ORDER = ( + RepeatQualifier, WithinQualifier, StartStopQualifier +) + + +def repeats_cmp(qual1, qual2): + """ + Compare REPEATS qualifiers. This orders by repeat count. + """ + return generic_constant_cmp(qual1.times_to_repeat, qual2.times_to_repeat) + + +def within_cmp(qual1, qual2): + """ + Compare WITHIN qualifiers. This orders by number of seconds. + """ + return generic_constant_cmp( + qual1.number_of_seconds, qual2.number_of_seconds + ) + + +def startstop_cmp(qual1, qual2): + """ + Compare START/STOP qualifiers. This lexicographically orders by start time, + then stop time. + """ + return iter_lex_cmp( + (qual1.start_time, qual1.stop_time), + (qual2.start_time, qual2.stop_time), + generic_constant_cmp + ) + + +_QUALIFIER_COMPARATORS = { + RepeatQualifier: repeats_cmp, + WithinQualifier: within_cmp, + StartStopQualifier: startstop_cmp +} + + +def observation_expression_cmp(expr1, expr2): + """ + Compare two observation expression ASTs. This is sensitive to the order of + the expressions' sub-components. To achieve an order-insensitive + comparison, the ASTs must be canonically ordered first. + + :param expr1: The first observation expression + :param expr2: The second observation expression + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + type1 = type(expr1) + type2 = type(expr2) + + type1_idx = _OBSERVATION_EXPRESSION_TYPE_ORDER.index(type1) + type2_idx = _OBSERVATION_EXPRESSION_TYPE_ORDER.index(type2) + + if type1_idx != type2_idx: + result = generic_cmp(type1_idx, type2_idx) + + # else, both exprs are of same type. + + # If they're simple, use contained comparison expression order + elif type1 is ObservationExpression: + result = comparison_expression_cmp( + expr1.operand, expr2.operand + ) + + elif isinstance(expr1, _CompoundObservationExpression): + # Both compound, and of same type (and/or/followedby): sort according + # to contents. + result = iter_lex_cmp( + expr1.operands, expr2.operands, observation_expression_cmp + ) + + else: # QualifiedObservationExpression + # Both qualified. Check qualifiers first; if they are the same, + # use order of the qualified expressions. + qual1_type = type(expr1.qualifier) + qual2_type = type(expr2.qualifier) + + qual1_type_idx = _QUALIFIER_TYPE_ORDER.index(qual1_type) + qual2_type_idx = _QUALIFIER_TYPE_ORDER.index(qual2_type) + + result = generic_cmp(qual1_type_idx, qual2_type_idx) + + if result == 0: + # Same qualifier type; compare qualifier details + qual_cmp = _QUALIFIER_COMPARATORS.get(qual1_type) + if qual_cmp: + result = qual_cmp(expr1.qualifier, expr2.qualifier) + else: + raise TypeError( + "Can't compare qualifier type: " + qual1_type.__name__ + ) + + if result == 0: + # Same qualifier type and details; use qualified expression order + result = observation_expression_cmp( + expr1.observation_expression, expr2.observation_expression + ) + + return result diff --git a/stix2/equivalence/patterns/transform/__init__.py b/stix2/equivalence/patterns/transform/__init__.py new file mode 100644 index 0000000..5df9061 --- /dev/null +++ b/stix2/equivalence/patterns/transform/__init__.py @@ -0,0 +1,56 @@ +""" +Generic AST transformation classes. +""" + +class Transformer: + """ + Base class for AST transformers. + """ + def transform(self, ast): + """ + Transform the given AST and return the resulting AST. + + :param ast: The AST to transform + :return: A 2-tuple: the transformed AST and a boolean indicating whether + the transformation actually changed anything. The change detection + is useful in situations where a transformation needs to be repeated + until the AST stops changing. + """ + raise NotImplemented("transform") + + +class ChainTransformer(Transformer): + """ + A composite transformer which consists of a sequence of sub-transformers. + Applying this transformer applies all sub-transformers in sequence, as + a group. + """ + def __init__(self, *transformers): + self.__transformers = transformers + + def transform(self, ast): + changed = False + for transformer in self.__transformers: + ast, this_changed = transformer.transform(ast) + if this_changed: + changed = True + + return ast, changed + + +class SettleTransformer(Transformer): + """ + A transformer that repeatedly performs a transformation until that + transformation no longer changes the AST. I.e. the AST has "settled". + """ + def __init__(self, transform): + self.__transformer = transform + + def transform(self, ast): + changed = False + ast, this_changed = self.__transformer.transform(ast) + while this_changed: + changed = True + ast, this_changed = self.__transformer.transform(ast) + + return ast, changed diff --git a/stix2/equivalence/patterns/transform/comparison.py b/stix2/equivalence/patterns/transform/comparison.py new file mode 100644 index 0000000..35cd8a8 --- /dev/null +++ b/stix2/equivalence/patterns/transform/comparison.py @@ -0,0 +1,331 @@ +""" +Transformation utilities for STIX pattern comparison expressions. +""" +import functools +import itertools +from stix2.equivalence.patterns.transform import Transformer +from stix2.patterns import ( + _BooleanExpression, _ComparisonExpression, AndBooleanExpression, + OrBooleanExpression, ParentheticalExpression +) +from stix2.equivalence.patterns.compare.comparison import ( + comparison_expression_cmp +) +from stix2.equivalence.patterns.compare import iter_lex_cmp, iter_in + + +def _dupe_ast(ast): + """ + Create a duplicate of the given AST. + + Note: the comparison expression "leaves", i.e. simple + comparisons are currently not duplicated. I don't think it's necessary as + of this writing; they are never changed. But revisit this if/when + necessary. + + :param ast: The AST to duplicate + :return: The duplicate AST + """ + if isinstance(ast, AndBooleanExpression): + result = AndBooleanExpression([ + _dupe_ast(operand) for operand in ast.operands + ]) + + elif isinstance(ast, OrBooleanExpression): + result = OrBooleanExpression([ + _dupe_ast(operand) for operand in ast.operands + ]) + + elif isinstance(ast, _ComparisonExpression): + # Change this to create a dupe, if we ever need to change simple + # comparison expressions as part of canonicalization. + result = ast + + else: + raise TypeError("Can't duplicate " + type(ast).__name__) + + return result + + +class ComparisonExpressionTransformer(Transformer): + """ + Transformer base class with special support for transforming comparison + expressions. The transform method implemented here performs a bottom-up + in-place transformation, with support for some comparison + expression-specific callbacks. + + Specifically, subclasses can implement methods: + "transform_or" for OR nodes + "transform_and" for AND nodes + "transform_default" for both types of nodes + + "transform_default" is a fallback, if a type-specific callback is not + found. The default implementation does nothing to the AST. The + type-specific callbacks are preferred over the default, if both exist. + + In all cases, the callbacks are called with an AST for a subtree rooted at + the appropriate node type, where the subtree's children have already been + transformed. They must return the same thing as the base transform() + method: a 2-tuple with the transformed AST and a boolean for change + detection. See doc for the superclass' method. + + This process currently silently drops parenthetical nodes, and "leaf" + comparison expression nodes are left unchanged. + """ + + def transform(self, ast): + if isinstance(ast, _BooleanExpression): + changed = False + for i, operand in enumerate(ast.operands): + operand_result, this_changed = self.transform(operand) + if this_changed: + changed = True + + ast.operands[i] = operand_result + + result, this_changed = self.__dispatch_transform(ast) + if this_changed: + changed = True + + elif isinstance(ast, _ComparisonExpression): + # Terminates recursion; we don't change these nodes + result = ast + changed = False + + elif isinstance(ast, ParentheticalExpression): + # Drop these + result, changed = self.transform(ast.expression) + + else: + raise TypeError("Not a comparison expression: " + str(ast)) + + return result, changed + + def __dispatch_transform(self, ast): + """ + Invoke a transformer callback method based on the given ast root node + type. + + :param ast: The AST + :return: The callback's result + """ + + if isinstance(ast, AndBooleanExpression): + meth = getattr(self, "transform_and", self.transform_default) + + elif isinstance(ast, OrBooleanExpression): + meth = getattr(self, "transform_or", self.transform_default) + + else: + meth = self.transform_default + + return meth(ast) + + def transform_default(self, ast): + """ + Override to handle transforming AST nodes which don't have a more + specific method implemented. + """ + return ast, False + + +class OrderDedupeTransformer( + ComparisonExpressionTransformer +): + """ + Canonically order the children of all nodes in the AST. Because the + deduping algorithm is based on sorted data, this transformation also does + deduping. + + E.g.: + A and A => A + A or A => A + """ + + def transform_default(self, ast): + """ + Sort/dedupe children. AND and OR can be treated identically. + + :param ast: The comparison expression AST + :return: The same AST node, but with sorted children + """ + sorted_children = sorted( + ast.operands, key=functools.cmp_to_key(comparison_expression_cmp) + ) + + deduped_children = [ + # Apparently when using a key function, groupby()'s "keys" are the + # key wrappers, not actual sequence values. Obviously we don't + # need key wrappers in our ASTs! + k.obj for k, _ in itertools.groupby( + sorted_children, key=functools.cmp_to_key( + comparison_expression_cmp + ) + ) + ] + + changed = iter_lex_cmp( + ast.operands, deduped_children, comparison_expression_cmp + ) != 0 + + ast.operands = deduped_children + + return ast, changed + + +class FlattenTransformer(ComparisonExpressionTransformer): + """ + Flatten all nodes of the AST. E.g.: + + A and (B and C) => A and B and C + A or (B or C) => A or B or C + (A) => A + """ + + def transform_default(self, ast): + """ + Flatten children. AND and OR can be treated mostly identically. The + little difference is that we can absorb AND children if we're an AND + ourselves; and OR for OR. + + :param ast: The comparison expression AST + :return: The same AST node, but with flattened children + """ + + if isinstance(ast, _BooleanExpression) and len(ast.operands) == 1: + # Replace an AND/OR with one child, with the child itself. + ast = ast.operands[0] + changed = True + + else: + flat_operands = [] + changed = False + for operand in ast.operands: + if isinstance(operand, _BooleanExpression) \ + and ast.operator == operand.operator: + flat_operands.extend(operand.operands) + changed = True + + else: + flat_operands.append(operand) + + ast.operands = flat_operands + + return ast, changed + + +class AbsorptionTransformer( + ComparisonExpressionTransformer +): + """ + Applies boolean "absorption" rules for AST simplification. E.g.: + + A and (A or B) = A + A or (A and B) = A + """ + + def transform_default(self, ast): + + changed = False + if isinstance(ast, _BooleanExpression): + secondary_op = "AND" if ast.operator == "OR" else "OR" + + to_delete = set() + + # Check i (child1) against j to see if we can delete j. + for i, child1 in enumerate(ast.operands): + if i in to_delete: + continue + + for j, child2 in enumerate(ast.operands): + if i == j or j in to_delete: + continue + + # We're checking if child1 is contained in child2, so + # child2 has to be a compound object, not just a simple + # comparison expression. We also require the right operator + # for child2: "AND" if ast is "OR" and vice versa. + if not isinstance(child2, _BooleanExpression) \ + or child2.operator != secondary_op: + continue + + # The simple check: is child1 contained in child2? + if iter_in( + child1, child2.operands, comparison_expression_cmp + ): + to_delete.add(j) + + # A more complicated check: does child1 occur in child2 + # in a "flattened" form? + elif child1.operator == child2.operator: + if all( + iter_in( + child1_operand, child2.operands, + comparison_expression_cmp + ) + for child1_operand in child1.operands + ): + to_delete.add(j) + + if to_delete: + changed = True + + for i in reversed(sorted(to_delete)): + del ast.operands[i] + + return ast, changed + + +class DNFTransformer(ComparisonExpressionTransformer): + """ + Convert a comparison expression AST to DNF. E.g.: + + A and (B or C) => (A and B) or (A and C) + """ + def transform_and(self, ast): + or_children = [] + other_children = [] + changed = False + + # Sort AND children into two piles: the ORs and everything else + for child in ast.operands: + if isinstance(child, _BooleanExpression) and child.operator == "OR": + # Need a list of operand lists, so we can compute the + # product below. + or_children.append(child.operands) + else: + other_children.append(child) + + if or_children: + distributed_children = [ + AndBooleanExpression([ + # Make dupes: distribution implies adding repetition, and + # we should ensure each repetition is independent of the + # others. + _dupe_ast(sub_ast) for sub_ast in itertools.chain( + other_children, prod_seq + ) + ]) + for prod_seq in itertools.product(*or_children) + ] + + # Need to recursively continue to distribute AND over OR in + # any of our new sub-expressions which need it. This causes + # more downward recursion in the midst of this bottom-up transform. + # It's not good for performance. I wonder if a top-down + # transformation algorithm would make more sense in this phase? + # But then we'd be using two different algorithms for the same + # thing... Maybe this transform should be completely top-down + # (no bottom-up component at all)? + distributed_children = [ + self.transform(child)[0] for child in distributed_children + ] + + result = OrBooleanExpression(distributed_children) + changed = True + + else: + # No AND-over-OR; nothing to do + result = ast + + return result, changed diff --git a/stix2/equivalence/patterns/transform/observation.py b/stix2/equivalence/patterns/transform/observation.py new file mode 100644 index 0000000..122a219 --- /dev/null +++ b/stix2/equivalence/patterns/transform/observation.py @@ -0,0 +1,486 @@ +""" +Transformation utilities for STIX pattern observation expressions. +""" +import functools +import itertools +from stix2.patterns import ( + ObservationExpression, AndObservationExpression, OrObservationExpression, + QualifiedObservationExpression, _CompoundObservationExpression, + ParentheticalExpression, FollowedByObservationExpression +) +from stix2.equivalence.patterns.transform import ( + ChainTransformer, SettleTransformer, Transformer +) +from stix2.equivalence.patterns.transform.comparison import ( + FlattenTransformer as CFlattenTransformer, + OrderDedupeTransformer as COrderDedupeTransformer, + AbsorptionTransformer as CAbsorptionTransformer, + DNFTransformer as CDNFTransformer +) +from stix2.equivalence.patterns.compare import iter_lex_cmp, iter_in +from stix2.equivalence.patterns.compare.observation import observation_expression_cmp + + +def _dupe_ast(ast): + """ + Create a duplicate of the given AST. The AST root must be an observation + expression of some kind (AND/OR/qualified, etc). + + Note: the observation expression "leaves", i.e. simple square-bracket + observation expressions are currently not duplicated. I don't think it's + necessary as of this writing. But revisit this if/when necessary. + + :param ast: The AST to duplicate + :return: The duplicate AST + """ + if isinstance(ast, AndObservationExpression): + result = AndObservationExpression([ + _dupe_ast(child) for child in ast.operands + ]) + + elif isinstance(ast, OrObservationExpression): + result = OrObservationExpression([ + _dupe_ast(child) for child in ast.operands + ]) + + elif isinstance(ast, FollowedByObservationExpression): + result = FollowedByObservationExpression([ + _dupe_ast(child) for child in ast.operands + ]) + + elif isinstance(ast, QualifiedObservationExpression): + # Don't need to dupe the qualifier object at this point + result = QualifiedObservationExpression( + _dupe_ast(ast.observation_expression), ast.qualifier + ) + + elif isinstance(ast, ObservationExpression): + result = ast + + else: + raise TypeError("Can't duplicate " + type(ast).__name__) + + return result + + +class ObservationExpressionTransformer(Transformer): + """ + Transformer base class with special support for transforming observation + expressions. The transform method implemented here performs a bottom-up + in-place transformation, with support for some observation + expression-specific callbacks. It recurses down as far as the "leaf node" + observation expressions; it does not go inside of them, to the individual + components of a comparison expression. + + Specifically, subclasses can implement methods: + "transform_or" for OR nodes + "transform_and" for AND nodes + "transform_followedby" for FOLLOWEDBY nodes + "transform_qualified" for qualified nodes (all qualifier types) + "transform_observation" for "leaf" observation expression nodes + "transform_default" for all types of nodes + + "transform_default" is a fallback, if a type-specific callback is not + found. The default implementation does nothing to the AST. The + type-specific callbacks are preferred over the default, if both exist. + + In all cases, the callbacks are called with an AST for a subtree rooted at + the appropriate node type, where the AST's children have already been + transformed. They must return the same thing as the base transform() + method: a 2-tuple with the transformed AST and a boolean for change + detection. See doc for the superclass' method. + + This process currently silently drops parenthetical nodes. + """ + + # Determines how AST node types map to callback method names + _DISPATCH_NAME_MAP = { + ObservationExpression: "observation", + AndObservationExpression: "and", + OrObservationExpression: "or", + FollowedByObservationExpression: "followedby", + QualifiedObservationExpression: "qualified" + } + + def transform(self, ast): + + changed = False + if isinstance(ast, ObservationExpression): + # A "leaf node" for observation expressions. We don't recurse into + # these. + result, this_changed = self.__dispatch_transform(ast) + if this_changed: + changed = True + + elif isinstance(ast, _CompoundObservationExpression): + for i, operand in enumerate(ast.operands): + result, this_changed = self.transform(operand) + if this_changed: + ast.operands[i] = result + changed = True + + result, this_changed = self.__dispatch_transform(ast) + if this_changed: + changed = True + + elif isinstance(ast, QualifiedObservationExpression): + # I don't think we need to process/transform the qualifier by + # itself, do we? + result, this_changed = self.transform(ast.observation_expression) + if this_changed: + ast.observation_expression = result + changed = True + + result, this_changed = self.__dispatch_transform(ast) + if this_changed: + changed = True + + elif isinstance(ast, ParentheticalExpression): + result, _ = self.transform(ast.expression) + # Dropping a node is a change, right? + changed = True + + else: + raise TypeError("Not an observation expression: {}: {}".format( + type(ast).__name__, str(ast) + )) + + return result, changed + + def __dispatch_transform(self, ast): + """ + Invoke a transformer callback method based on the given ast root node + type. + + :param ast: The AST + :return: The callback's result + """ + + dispatch_name = self._DISPATCH_NAME_MAP.get(type(ast)) + if dispatch_name: + meth_name = "transform_" + dispatch_name + meth = getattr(self, meth_name, self.transform_default) + else: + meth = self.transform_default + + return meth(ast) + + def transform_default(self, ast): + return ast, False + + +class FlattenTransformer(ObservationExpressionTransformer): + """ + Flatten an observation expression AST. E.g.: + + A and (B and C) => A and B and C + A or (B or C) => A or B or C + A followedby (B followedby C) => A followedby B followedby C + (A) => A + """ + + def __transform(self, ast): + + changed = False + + if len(ast.operands) == 1: + # Replace an AND/OR/FOLLOWEDBY with one child, with the child + # itself. + result = ast.operands[0] + changed = True + + else: + flat_children = [] + for operand in ast.operands: + if isinstance(operand, _CompoundObservationExpression) \ + and ast.operator == operand.operator: + flat_children.extend(operand.operands) + changed = True + else: + flat_children.append(operand) + + ast.operands = flat_children + result = ast + + return result, changed + + def transform_and(self, ast): + return self.__transform(ast) + + def transform_or(self, ast): + return self.__transform(ast) + + def transform_followedby(self, ast): + return self.__transform(ast) + + +class OrderDedupeTransformer( + ObservationExpressionTransformer +): + """ + Canonically order AND/OR expressions, and dedupe ORs. E.g.: + + A or A => A + B or A => A or B + B and A => A and B + """ + + def __transform(self, ast): + sorted_children = sorted( + ast.operands, key=functools.cmp_to_key(observation_expression_cmp) + ) + + # Deduping only applies to ORs + if ast.operator == "OR": + deduped_children = [ + key.obj for key, _ in itertools.groupby( + sorted_children, key=functools.cmp_to_key( + observation_expression_cmp + ) + ) + ] + else: + deduped_children = sorted_children + + changed = iter_lex_cmp( + ast.operands, deduped_children, observation_expression_cmp + ) != 0 + + ast.operands = deduped_children + + return ast, changed + + def transform_and(self, ast): + return self.__transform(ast) + + def transform_or(self, ast): + return self.__transform(ast) + + +class AbsorptionTransformer( + ObservationExpressionTransformer +): + """ + Applies boolean "absorption" rules for observation expressions, for AST + simplification: + + A or (A and B) = A + A or (A followedby B) = A + + Other variants do not hold for observation expressions. + """ + + def __is_contained_and(self, exprs_containee, exprs_container): + """ + Determine whether the "containee" expressions are contained in the + "container" expressions, with AND semantics (order-independent but need + distinct bindings). For example (with containee on left and container + on right): + + (A and A and B) or (A and B and C) + + In the above, all of the lhs vars have a counterpart in the rhs, but + there are two A's on the left and only one on the right. Therefore, + the right does not "contain" the left. You would need two A's on the + right. + + :param exprs_containee: The expressions we want to check for containment + :param exprs_container: The expressions acting as the "container" + :return: True if the containee is contained in the container; False if + not + """ + + # make our own list we are free to manipulate without affecting the + # function args. + container = list(exprs_container) + + result = True + for ee in exprs_containee: + for i, er in enumerate(container): + if observation_expression_cmp(ee, er) == 0: + # Found a match in the container; delete it so we never try + # to match a container expr to two different containee + # expressions. + del container[i] + break + else: + result = False + break + + return result + + def __is_contained_followedby(self, exprs_containee, exprs_container): + """ + Determine whether the "containee" expressions are contained in the + "container" expressions, with FOLLOWEDBY semantics (order-sensitive and + need distinct bindings). For example (with containee on left and + container on right): + + (A followedby B) or (B followedby A) + + In the above, all of the lhs vars have a counterpart in the rhs, but + the vars on the right are not in the same order. Therefore, the right + does not "contain" the left. The container vars don't have to be + contiguous though. E.g. in: + + (A followedby B) or (D followedby A followedby C followedby B) + + in the container (rhs), B follows A, so it "contains" the lhs even + though there is other stuff mixed in. + + :param exprs_containee: The expressions we want to check for containment + :param exprs_container: The expressions acting as the "container" + :return: True if the containee is contained in the container; False if + not + """ + + ee_iter = iter(exprs_containee) + er_iter = iter(exprs_container) + + result = True + while True: + ee = next(ee_iter, None) + if not ee: + break + + while True: + er = next(er_iter, None) + if er: + if observation_expression_cmp(ee, er) == 0: + break + else: + break + + if not er: + result = False + break + + return result + + def transform_or(self, ast): + changed = False + to_delete = set() + for i, child1 in enumerate(ast.operands): + if i in to_delete: + continue + + # The simplification doesn't work across qualifiers + if isinstance(child1, QualifiedObservationExpression): + continue + + for j, child2 in enumerate(ast.operands): + if i == j or j in to_delete: + continue + + if isinstance( + child2, ( + AndObservationExpression, + FollowedByObservationExpression + ) + ): + # The simple check: is child1 contained in child2? + if iter_in( + child1, child2.operands, observation_expression_cmp + ): + to_delete.add(j) + + # A more complicated check: does child1 occur in child2 + # in a "flattened" form? + elif type(child1) is type(child2): + if isinstance(child1, AndObservationExpression): + can_simplify = self.__is_contained_and( + child1.operands, child2.operands + ) + else: # child1 and 2 are followedby nodes + can_simplify = self.__is_contained_followedby( + child1.operands, child2.operands + ) + + if can_simplify: + to_delete.add(j) + + if to_delete: + changed = True + + for i in reversed(sorted(to_delete)): + del ast.operands[i] + + return ast, changed + + +class DNFTransformer(ObservationExpressionTransformer): + """ + Transform an observation expression to DNF. This will distribute AND and + FOLLOWEDBY over OR: + + A and (B or C) => (A and B) or (A and C) + A followedby (B or C) => (A followedby B) or (A followedby C) + """ + + def __transform(self, ast): + + root_type = type(ast) # will be AST class for AND or FOLLOWEDBY + changed = False + or_children = [] + other_children = [] + for child in ast.operands: + if isinstance(child, OrObservationExpression): + or_children.append(child.operands) + else: + other_children.append(child) + + if or_children: + distributed_children = [ + root_type([ + _dupe_ast(sub_ast) for sub_ast in itertools.chain( + other_children, prod_seq + ) + ]) + for prod_seq in itertools.product(*or_children) + ] + + # Need to recursively continue to distribute AND/FOLLOWEDBY over OR + # in any of our new sub-expressions which need it. + distributed_children = [ + self.transform(child)[0] for child in distributed_children + ] + + result = OrObservationExpression(distributed_children) + changed = True + + else: + result = ast + + return result, changed + + def transform_and(self, ast): + return self.__transform(ast) + + def transform_followedby(self, ast): + return self.__transform(ast) + + +class CanonicalizeComparisonExpressionsTransformer( + ObservationExpressionTransformer +): + """ + Canonicalize all comparison expressions. + """ + def __init__(self): + comp_flatten = CFlattenTransformer() + comp_order = COrderDedupeTransformer() + comp_absorb = CAbsorptionTransformer() + simplify = ChainTransformer(comp_flatten, comp_order, comp_absorb) + settle_simplify = SettleTransformer(simplify) + + comp_dnf = CDNFTransformer() + self.__comp_canonicalize = ChainTransformer( + settle_simplify, comp_dnf, settle_simplify + ) + + def transform_observation(self, ast): + comp_expr = ast.operand + canon_comp_expr, changed = self.__comp_canonicalize.transform(comp_expr) + ast.operand = canon_comp_expr + + return ast, changed From 5d6c7d8c8a5906b089d0844522f1781b524da32d Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Wed, 12 Aug 2020 19:28:35 -0400 Subject: [PATCH 02/10] Add some simple context-sensitive constant canonicalization, used as part of canonicalizing comparison expressions. This required adding a new comparison expression transformer callback for leaf-node comparison expression objects, and updating all existing comparison transformers to work (it affected all/most of them). The observation expression transformer which actually does the comparison canonicalization was updated to also perform this special canonicalization step. --- .../patterns/transform/comparison.py | 136 +++++++---- .../patterns/transform/observation.py | 6 +- .../patterns/transform/specials.py | 215 ++++++++++++++++++ 3 files changed, 310 insertions(+), 47 deletions(-) create mode 100644 stix2/equivalence/patterns/transform/specials.py diff --git a/stix2/equivalence/patterns/transform/comparison.py b/stix2/equivalence/patterns/transform/comparison.py index 35cd8a8..2848598 100644 --- a/stix2/equivalence/patterns/transform/comparison.py +++ b/stix2/equivalence/patterns/transform/comparison.py @@ -4,6 +4,9 @@ Transformation utilities for STIX pattern comparison expressions. import functools import itertools from stix2.equivalence.patterns.transform import Transformer +from stix2.equivalence.patterns.transform.specials import ( + windows_reg_key, ipv4_addr, ipv6_addr +) from stix2.patterns import ( _BooleanExpression, _ComparisonExpression, AndBooleanExpression, OrBooleanExpression, ParentheticalExpression @@ -57,6 +60,7 @@ class ComparisonExpressionTransformer(Transformer): Specifically, subclasses can implement methods: "transform_or" for OR nodes "transform_and" for AND nodes + "transform_comparison" for plain comparison nodes ( ) "transform_default" for both types of nodes "transform_default" is a fallback, if a type-specific callback is not @@ -69,8 +73,7 @@ class ComparisonExpressionTransformer(Transformer): method: a 2-tuple with the transformed AST and a boolean for change detection. See doc for the superclass' method. - This process currently silently drops parenthetical nodes, and "leaf" - comparison expression nodes are left unchanged. + This process currently silently drops parenthetical nodes. """ def transform(self, ast): @@ -88,9 +91,7 @@ class ComparisonExpressionTransformer(Transformer): changed = True elif isinstance(ast, _ComparisonExpression): - # Terminates recursion; we don't change these nodes - result = ast - changed = False + result, changed = self.__dispatch_transform(ast) elif isinstance(ast, ParentheticalExpression): # Drop these @@ -116,6 +117,11 @@ class ComparisonExpressionTransformer(Transformer): elif isinstance(ast, OrBooleanExpression): meth = getattr(self, "transform_or", self.transform_default) + elif isinstance(ast, _ComparisonExpression): + meth = getattr( + self, "transform_comparison", self.transform_default + ) + else: meth = self.transform_default @@ -142,7 +148,7 @@ class OrderDedupeTransformer( A or A => A """ - def transform_default(self, ast): + def __transform(self, ast): """ Sort/dedupe children. AND and OR can be treated identically. @@ -172,6 +178,12 @@ class OrderDedupeTransformer( return ast, changed + def transform_or(self, ast): + return self.__transform(ast) + + def transform_and(self, ast): + return self.__transform(ast) + class FlattenTransformer(ComparisonExpressionTransformer): """ @@ -182,7 +194,7 @@ class FlattenTransformer(ComparisonExpressionTransformer): (A) => A """ - def transform_default(self, ast): + def __transform(self, ast): """ Flatten children. AND and OR can be treated mostly identically. The little difference is that we can absorb AND children if we're an AND @@ -192,14 +204,14 @@ class FlattenTransformer(ComparisonExpressionTransformer): :return: The same AST node, but with flattened children """ - if isinstance(ast, _BooleanExpression) and len(ast.operands) == 1: + changed = False + if len(ast.operands) == 1: # Replace an AND/OR with one child, with the child itself. ast = ast.operands[0] changed = True else: flat_operands = [] - changed = False for operand in ast.operands: if isinstance(operand, _BooleanExpression) \ and ast.operator == operand.operator: @@ -213,6 +225,12 @@ class FlattenTransformer(ComparisonExpressionTransformer): return ast, changed + def transform_or(self, ast): + return self.__transform(ast) + + def transform_and(self, ast): + return self.__transform(ast) + class AbsorptionTransformer( ComparisonExpressionTransformer @@ -224,57 +242,62 @@ class AbsorptionTransformer( A or (A and B) = A """ - def transform_default(self, ast): + def __transform(self, ast): changed = False - if isinstance(ast, _BooleanExpression): - secondary_op = "AND" if ast.operator == "OR" else "OR" + secondary_op = "AND" if ast.operator == "OR" else "OR" - to_delete = set() + to_delete = set() - # Check i (child1) against j to see if we can delete j. - for i, child1 in enumerate(ast.operands): - if i in to_delete: + # Check i (child1) against j to see if we can delete j. + for i, child1 in enumerate(ast.operands): + if i in to_delete: + continue + + for j, child2 in enumerate(ast.operands): + if i == j or j in to_delete: continue - for j, child2 in enumerate(ast.operands): - if i == j or j in to_delete: - continue + # We're checking if child1 is contained in child2, so + # child2 has to be a compound object, not just a simple + # comparison expression. We also require the right operator + # for child2: "AND" if ast is "OR" and vice versa. + if not isinstance(child2, _BooleanExpression) \ + or child2.operator != secondary_op: + continue - # We're checking if child1 is contained in child2, so - # child2 has to be a compound object, not just a simple - # comparison expression. We also require the right operator - # for child2: "AND" if ast is "OR" and vice versa. - if not isinstance(child2, _BooleanExpression) \ - or child2.operator != secondary_op: - continue + # The simple check: is child1 contained in child2? + if iter_in( + child1, child2.operands, comparison_expression_cmp + ): + to_delete.add(j) - # The simple check: is child1 contained in child2? - if iter_in( - child1, child2.operands, comparison_expression_cmp + # A more complicated check: does child1 occur in child2 + # in a "flattened" form? + elif child1.operator == child2.operator: + if all( + iter_in( + child1_operand, child2.operands, + comparison_expression_cmp + ) + for child1_operand in child1.operands ): to_delete.add(j) - # A more complicated check: does child1 occur in child2 - # in a "flattened" form? - elif child1.operator == child2.operator: - if all( - iter_in( - child1_operand, child2.operands, - comparison_expression_cmp - ) - for child1_operand in child1.operands - ): - to_delete.add(j) + if to_delete: + changed = True - if to_delete: - changed = True - - for i in reversed(sorted(to_delete)): - del ast.operands[i] + for i in reversed(sorted(to_delete)): + del ast.operands[i] return ast, changed + def transform_or(self, ast): + return self.__transform(ast) + + def transform_and(self, ast): + return self.__transform(ast) + class DNFTransformer(ComparisonExpressionTransformer): """ @@ -329,3 +352,26 @@ class DNFTransformer(ComparisonExpressionTransformer): result = ast return result, changed + + +class SpecialValueCanonicalization(ComparisonExpressionTransformer): + """ + Try to find particular leaf-node comparison expressions whose rhs (i.e. the + constant) can be canonicalized. This is an idiosyncratic transformation + based on some ideas people had for context-sensitive semantic equivalence + in constant values. + """ + def transform_comparison(self, ast): + if ast.lhs.object_type_name == "windows-registry-key": + windows_reg_key(ast) + + elif ast.lhs.object_type_name == "ipv4-addr": + ipv4_addr(ast) + + elif ast.lhs.object_type_name == "ipv6-addr": + ipv6_addr(ast) + + # Hard-code False here since this particular canonicalization is never + # worth doing more than once. I think it's okay to pretend nothing has + # changed. + return ast, False diff --git a/stix2/equivalence/patterns/transform/observation.py b/stix2/equivalence/patterns/transform/observation.py index 122a219..4470706 100644 --- a/stix2/equivalence/patterns/transform/observation.py +++ b/stix2/equivalence/patterns/transform/observation.py @@ -15,7 +15,8 @@ from stix2.equivalence.patterns.transform.comparison import ( FlattenTransformer as CFlattenTransformer, OrderDedupeTransformer as COrderDedupeTransformer, AbsorptionTransformer as CAbsorptionTransformer, - DNFTransformer as CDNFTransformer + DNFTransformer as CDNFTransformer, + SpecialValueCanonicalization ) from stix2.equivalence.patterns.compare import iter_lex_cmp, iter_in from stix2.equivalence.patterns.compare.observation import observation_expression_cmp @@ -473,9 +474,10 @@ class CanonicalizeComparisonExpressionsTransformer( simplify = ChainTransformer(comp_flatten, comp_order, comp_absorb) settle_simplify = SettleTransformer(simplify) + comp_special = SpecialValueCanonicalization() comp_dnf = CDNFTransformer() self.__comp_canonicalize = ChainTransformer( - settle_simplify, comp_dnf, settle_simplify + comp_special, settle_simplify, comp_dnf, settle_simplify ) def transform_observation(self, ast): diff --git a/stix2/equivalence/patterns/transform/specials.py b/stix2/equivalence/patterns/transform/specials.py new file mode 100644 index 0000000..c565e27 --- /dev/null +++ b/stix2/equivalence/patterns/transform/specials.py @@ -0,0 +1,215 @@ +""" +Some simple comparison expression canonicalization functions. +""" +import socket +from stix2.equivalence.patterns.compare.comparison import ( + object_path_to_raw_values +) + + +# Values we can use as wildcards in path patterns +_ANY_IDX = object() +_ANY_KEY = object() +_ANY = object() + + +def _path_is(object_path, path_pattern): + """ + Compare an object path against a pattern. This enables simple path + recognition based on a pattern, which is slightly more flexible than exact + equality: it supports some simple wildcards. + + The path pattern must be an iterable of values: strings for key path steps, + ints or "*" for index path steps, or wildcards. Exact matches are required + for non-wildcards in the pattern. For the wildcards, _ANY_IDX matches any + index path step; _ANY_KEY matches any key path step, and _ANY matches any + path step. + + :param object_path: An ObjectPath instance + :param path_pattern: An iterable giving the pattern path steps + :return: True if the path matches the pattern; False if not + """ + path_values = object_path_to_raw_values(object_path) + + path_iter = iter(path_values) + patt_iter = iter(path_pattern) + + result = True + while True: + path_val = next(path_iter, None) + patt_val = next(patt_iter, None) + + if path_val is None and patt_val is None: + # equal length sequences; no differences found + break + + elif path_val is None or patt_val is None: + # unequal length sequences + result = False + break + + elif patt_val is _ANY_IDX: + if not isinstance(path_val, int) and path_val != "*": + result = False + break + + elif patt_val is _ANY_KEY: + if not isinstance(path_val, str): + result = False + break + + elif patt_val is not _ANY and patt_val != path_val: + result = False + break + + return result + + +def _mask_bytes(ip_bytes, prefix_size): + """ + Retain the high-order 'prefix_size' bits from ip_bytes, and zero out the + remaining low-order bits. This side-effects ip_bytes. + + :param ip_bytes: A mutable byte sequence (e.g. a bytearray) + :param prefix_size: An integer prefix size + """ + addr_size_bytes = len(ip_bytes) + addr_size_bits = 8 * addr_size_bytes + + assert 0 <= prefix_size <= addr_size_bits + + num_fixed_bytes = prefix_size // 8 + num_zero_bytes = (addr_size_bits - prefix_size) // 8 + + if num_zero_bytes > 0: + ip_bytes[addr_size_bytes - num_zero_bytes:] = b"\x00" * num_zero_bytes + + if num_fixed_bytes + num_zero_bytes != addr_size_bytes: + # The address boundary doesn't fall on a byte boundary. + # So we have a byte for which we have to zero out some + # bits. + num_1_bits = prefix_size % 8 + mask = ((1 << num_1_bits) - 1) << (8 - num_1_bits) + ip_bytes[num_fixed_bytes] &= mask + + +def windows_reg_key(comp_expr): + """ + Lower-cases the rhs, depending on the windows-registry-key property + being compared. This enables case-insensitive comparisons between two + patterns, for those values. This side-effects the given AST. + + :param comp_expr: A _ComparisonExpression object whose type is + windows-registry-key + """ + if _path_is(comp_expr.lhs, ("key",)) \ + or _path_is(comp_expr.lhs, ("values", _ANY_IDX, "name")): + comp_expr.rhs.value = comp_expr.rhs.value.lower() + + +def ipv4_addr(comp_expr): + """ + Canonicalizes a CIDR IPv4 address by zeroing out low-order bits, according + to the prefix size. This affects the rhs when the "value" property of an + ipv4-addr is being compared. If the prefix size is 32, the size suffix is + simply dropped since it's redundant. If the value is not a valid CIDR + address, then no change is made. This also runs the address through the + platform's IPv4 address processing functions (inet_aton() and inet_ntoa()), + which can adjust the format. + + This side-effects the given AST. + + :param comp_expr: A _ComparisonExpression object whose type is ipv4-addr. + """ + if _path_is(comp_expr.lhs, ("value",)): + value = comp_expr.rhs.value + slash_idx = value.find("/") + + if 0 <= slash_idx < len(value)-1: + ip_str = value[:slash_idx] + try: + ip_bytes = socket.inet_aton(ip_str) + except OSError: + # illegal IPv4 address string + return + + try: + prefix_size = int(value[slash_idx+1:]) + except ValueError: + # illegal prefix size + return + + if prefix_size < 0 or prefix_size > 32: + # illegal prefix size + return + + if prefix_size == 32: + # Drop the "32" since it's redundant. Run the address bytes + # through inet_ntoa() in case it would adjust the format (e.g. + # drop leading zeros: 1.2.3.004 => 1.2.3.4). + value = socket.inet_ntoa(ip_bytes) + + else: + # inet_aton() gives an immutable 'bytes' value; we need a value + # we can change. + ip_bytes = bytearray(ip_bytes) + _mask_bytes(ip_bytes, prefix_size) + + ip_str = socket.inet_ntoa(ip_bytes) + value = ip_str + "/" + str(prefix_size) + + comp_expr.rhs.value = value + + +def ipv6_addr(comp_expr): + """ + Canonicalizes a CIDR IPv6 address by zeroing out low-order bits, according + to the prefix size. This affects the rhs when the "value" property of an + ipv6-addr is being compared. If the prefix size is 128, the size suffix is + simply dropped since it's redundant. If the value is not a valid CIDR + address, then no change is made. This also runs the address through the + platform's IPv6 address processing functions (inet_pton() and inet_ntop()), + which can adjust the format. + + This side-effects the given AST. + + :param comp_expr: A _ComparisonExpression object whose type is ipv6-addr. + """ + if _path_is(comp_expr.lhs, ("value",)): + value = comp_expr.rhs.value + slash_idx = value.find("/") + + if 0 <= slash_idx < len(value)-1: + ip_str = value[:slash_idx] + try: + ip_bytes = socket.inet_pton(socket.AF_INET6, ip_str) + except OSError: + # illegal IPv6 address string + return + + try: + prefix_size = int(value[slash_idx+1:]) + except ValueError: + # illegal prefix size + return + + if prefix_size < 0 or prefix_size > 128: + # illegal prefix size + return + + if prefix_size == 128: + # Drop the "128" since it's redundant. Run the IP address + # through inet_ntop() so it can reformat with the double-colons + # (and make any other adjustments) if necessary. + value = socket.inet_ntop(socket.AF_INET6, ip_bytes) + + else: + # inet_pton() gives an immutable 'bytes' value; we need a value + # we can change. + ip_bytes = bytearray(ip_bytes) + _mask_bytes(ip_bytes, prefix_size) + + ip_str = socket.inet_ntop(socket.AF_INET6, ip_bytes) + value = ip_str + "/" + str(prefix_size) + + comp_expr.rhs.value = value From 6c92f670cb824700c34e04b1493215cd5aaae80d Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Thu, 13 Aug 2020 16:22:24 -0400 Subject: [PATCH 03/10] Fix ipv4/6 special canonicalizers to reformat IP addresses even when a non-CIDR address is used. Before, it left plain IP addresses untouched. --- .../patterns/transform/specials.py | 88 +++++++++++-------- 1 file changed, 50 insertions(+), 38 deletions(-) diff --git a/stix2/equivalence/patterns/transform/specials.py b/stix2/equivalence/patterns/transform/specials.py index c565e27..0eba091 100644 --- a/stix2/equivalence/patterns/transform/specials.py +++ b/stix2/equivalence/patterns/transform/specials.py @@ -124,15 +124,20 @@ def ipv4_addr(comp_expr): if _path_is(comp_expr.lhs, ("value",)): value = comp_expr.rhs.value slash_idx = value.find("/") + is_cidr = slash_idx >= 0 - if 0 <= slash_idx < len(value)-1: + if is_cidr: ip_str = value[:slash_idx] - try: - ip_bytes = socket.inet_aton(ip_str) - except OSError: - # illegal IPv4 address string - return + else: + ip_str = value + try: + ip_bytes = socket.inet_aton(ip_str) + except OSError: + # illegal IPv4 address string + return + + if is_cidr: try: prefix_size = int(value[slash_idx+1:]) except ValueError: @@ -143,22 +148,23 @@ def ipv4_addr(comp_expr): # illegal prefix size return - if prefix_size == 32: - # Drop the "32" since it's redundant. Run the address bytes - # through inet_ntoa() in case it would adjust the format (e.g. - # drop leading zeros: 1.2.3.004 => 1.2.3.4). - value = socket.inet_ntoa(ip_bytes) + if not is_cidr or prefix_size == 32: + # If a CIDR with prefix size 32, drop the prefix size since it's + # redundant. Run the address bytes through inet_ntoa() in case it + # would adjust the format (e.g. drop leading zeros: + # 1.2.3.004 => 1.2.3.4). + value = socket.inet_ntoa(ip_bytes) - else: - # inet_aton() gives an immutable 'bytes' value; we need a value - # we can change. - ip_bytes = bytearray(ip_bytes) - _mask_bytes(ip_bytes, prefix_size) + else: + # inet_aton() gives an immutable 'bytes' value; we need a value + # we can change. + ip_bytes = bytearray(ip_bytes) + _mask_bytes(ip_bytes, prefix_size) - ip_str = socket.inet_ntoa(ip_bytes) - value = ip_str + "/" + str(prefix_size) + ip_str = socket.inet_ntoa(ip_bytes) + value = ip_str + "/" + str(prefix_size) - comp_expr.rhs.value = value + comp_expr.rhs.value = value def ipv6_addr(comp_expr): @@ -178,15 +184,20 @@ def ipv6_addr(comp_expr): if _path_is(comp_expr.lhs, ("value",)): value = comp_expr.rhs.value slash_idx = value.find("/") + is_cidr = slash_idx >= 0 - if 0 <= slash_idx < len(value)-1: + if is_cidr: ip_str = value[:slash_idx] - try: - ip_bytes = socket.inet_pton(socket.AF_INET6, ip_str) - except OSError: - # illegal IPv6 address string - return + else: + ip_str = value + try: + ip_bytes = socket.inet_pton(socket.AF_INET6, ip_str) + except OSError: + # illegal IPv6 address string + return + + if is_cidr: try: prefix_size = int(value[slash_idx+1:]) except ValueError: @@ -197,19 +208,20 @@ def ipv6_addr(comp_expr): # illegal prefix size return - if prefix_size == 128: - # Drop the "128" since it's redundant. Run the IP address - # through inet_ntop() so it can reformat with the double-colons - # (and make any other adjustments) if necessary. - value = socket.inet_ntop(socket.AF_INET6, ip_bytes) + if not is_cidr or prefix_size == 128: + # If a CIDR with prefix size 128, drop the prefix size since it's + # redundant. Run the IP address through inet_ntop() so it can + # reformat with the double-colons (and make any other adjustments) + # if necessary. + value = socket.inet_ntop(socket.AF_INET6, ip_bytes) - else: - # inet_pton() gives an immutable 'bytes' value; we need a value - # we can change. - ip_bytes = bytearray(ip_bytes) - _mask_bytes(ip_bytes, prefix_size) + else: + # inet_pton() gives an immutable 'bytes' value; we need a value + # we can change. + ip_bytes = bytearray(ip_bytes) + _mask_bytes(ip_bytes, prefix_size) - ip_str = socket.inet_ntop(socket.AF_INET6, ip_bytes) - value = ip_str + "/" + str(prefix_size) + ip_str = socket.inet_ntop(socket.AF_INET6, ip_bytes) + value = ip_str + "/" + str(prefix_size) - comp_expr.rhs.value = value + comp_expr.rhs.value = value From bd5635f5be4f30ccef5b8e86f5454de0ab675c75 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Thu, 13 Aug 2020 16:46:25 -0400 Subject: [PATCH 04/10] Add some unit tests for pattern equivalence. --- stix2/test/test_pattern_equivalence.py | 571 +++++++++++++++++++++++++ 1 file changed, 571 insertions(+) create mode 100644 stix2/test/test_pattern_equivalence.py diff --git a/stix2/test/test_pattern_equivalence.py b/stix2/test/test_pattern_equivalence.py new file mode 100644 index 0000000..73eca58 --- /dev/null +++ b/stix2/test/test_pattern_equivalence.py @@ -0,0 +1,571 @@ +import pytest +from stix2.equivalence.patterns import equivalent_patterns + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] OR [a:b=1]", + "[a:b=1]" + ), + ( + "[a:b=1] OR [a:b=1] OR [a:b=1]", + "[a:b=1]" + ), + ] +) +def test_obs_dupe_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] AND [a:b=1]", + "[a:b=1]" + ), + ( + "[a:b=1] FOLLOWEDBY [a:b=1]", + "[a:b=1]" + ), + ] +) +def test_obs_dupe_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ("[a:b=1]", "([a:b=1])"), + ("(((([a:b=1]))))", "([a:b=1])"), + ( + "[a:b=1] AND ([a:b=2] AND [a:b=3])", + "[a:b=1] AND [a:b=2] AND [a:b=3]", + ), + ( + "([a:b=1] AND [a:b=2]) AND [a:b=3]", + "[a:b=1] AND ([a:b=2] AND [a:b=3])", + ), + ( + "[a:b=1] OR ([a:b=2] OR [a:b=3])", + "[a:b=1] OR [a:b=2] OR [a:b=3]", + ), + ( + "([a:b=1] OR [a:b=2]) OR [a:b=3]", + "[a:b=1] OR ([a:b=2] OR [a:b=3])", + ), + ( + "[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY [a:b=3])", + "[a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]", + ), + ( + "([a:b=1] FOLLOWEDBY [a:b=2]) FOLLOWEDBY [a:b=3]", + "[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY [a:b=3])", + ), + ( + "[a:b=1] AND ([a:b=2] AND ([a:b=3] AND [a:b=4])) AND ([a:b=5])", + "([a:b=1] AND ([a:b=2] AND [a:b=3]) AND ([a:b=4] AND [a:b=5]))", + ) + ] +) +def test_obs_flatten_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "([a:b=1] AND [a:b=2]) OR [a:b=3]", + "[a:b=1] AND ([a:b=2] OR [a:b=3])", + ), + ( + "([a:b=1] OR [a:b=2]) FOLLOWEDBY [a:b=3]", + "[a:b=1] OR ([a:b=2] FOLLOWEDBY [a:b=3])", + ), + ("[a:b=1]", "([a:b=1]) REPEATS 2 TIMES"), + ("(((([a:b=1]))))", "([a:b=1] REPEATS 2 TIMES)"), + ( + "[a:b=1] AND ([a:b=2] AND [a:b=3]) WITHIN 2 SECONDS", + "[a:b=1] WITHIN 2 SECONDS AND [a:b=2] AND [a:b=3]", + ), + ( + "[a:b=1] OR ([a:b=2] OR [a:b=3]) WITHIN 2 SECONDS", + "[a:b=1] WITHIN 2 SECONDS OR [a:b=2] OR [a:b=3]", + ), + ( + "[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY [a:b=3]) WITHIN 2 SECONDS", + "[a:b=1] WITHIN 2 SECONDS FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]", + ), + ] +) +def test_obs_flatten_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] AND [a:b=2]", + "[a:b=2] AND [a:b=1]" + ), + ( + "[a:b=1] OR [a:b=2]", + "[a:b=2] OR [a:b=1]" + ), + ( + "[a:b=1] OR ([a:b=2] AND [a:b=3])", + "([a:b=3] AND [a:b=2]) OR [a:b=1]" + ), + ( + "[a:b=1] WITHIN 2 SECONDS AND [a:b=2] REPEATS 2 TIMES", + "[a:b=2] REPEATS 2 TIMES AND [a:b=1] WITHIN 2 SECONDS" + ) + ] +) +def test_obs_order_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] FOLLOWEDBY [a:b=2]", + "[a:b=2] FOLLOWEDBY [a:b=1]" + ), + ( + "[a:b=1] WITHIN 2 SECONDS AND [a:b=2] REPEATS 2 TIMES", + "[a:b=1] REPEATS 2 TIMES AND [a:b=2] WITHIN 2 SECONDS" + ) + ] +) +def test_obs_order_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] OR ([a:b=1] AND [a:b=2])", + "[a:b=1]" + ), + ( + "[a:b=1] OR ([a:b=1] FOLLOWEDBY [a:b=2])", + "[a:b=1]" + ), + ( + "([a:b=3] AND [a:b=1]) OR ([a:b=1] AND [a:b=2] AND [a:b=3])", + "[a:b=3] AND [a:b=1]" + ), + ( + "([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=4] FOLLOWEDBY [a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3])", + "[a:b=1] FOLLOWEDBY [a:b=3]" + ), + ( + "([a:b=1] FOLLOWEDBY [a:b=2]) OR (([a:b=1] FOLLOWEDBY [a:b=2]) AND [a:b=3])", + "[a:b=1] FOLLOWEDBY [a:b=2]" + ), + ( + "([a:b=1] AND [a:b=2]) OR (([a:b=1] AND [a:b=2]) FOLLOWEDBY [a:b=3])", + "[a:b=1] AND [a:b=2]" + ), + ] +) +def test_obs_absorb_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "([a:b=1] AND [a:b=2]) OR ([a:b=2] AND [a:b=3] AND [a:b=4])", + "[a:b=1] AND [a:b=2]" + ), + ( + "([a:b=2] FOLLOWEDBY [a:b=1]) OR ([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3])", + "[a:b=2] FOLLOWEDBY [a:b=1]" + ) + ] +) +def test_obs_absorb_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] AND ([a:b=2] OR [a:b=3])", + "([a:b=1] AND [a:b=2]) OR ([a:b=1] AND [a:b=3])" + ), + ( + "[a:b=1] FOLLOWEDBY ([a:b=2] OR [a:b=3])", + "([a:b=1] FOLLOWEDBY [a:b=2]) OR ([a:b=1] FOLLOWEDBY [a:b=3])" + ), + ( + "[a:b=1] AND ([a:b=2] AND ([a:b=3] OR [a:b=4]))", + "([a:b=1] AND [a:b=2] AND [a:b=3]) OR ([a:b=1] AND [a:b=2] AND [a:b=4])" + ), + ( + "[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY ([a:b=3] OR [a:b=4]))", + "([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=4])" + ), + ( + "([a:b=1] OR [a:b=2]) AND ([a:b=3] OR [a:b=4])", + "([a:b=1] AND [a:b=3]) OR ([a:b=1] AND [a:b=4]) OR ([a:b=2] AND [a:b=3]) OR ([a:b=2] AND [a:b=4])" + ), + ( + "([a:b=1] OR [a:b=2]) FOLLOWEDBY ([a:b=3] OR [a:b=4])", + "([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=4]) OR ([a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=2] FOLLOWEDBY [a:b=4])" + ), + ] +) +def test_obs_dnf_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] AND [a:b=2]", + "[a:b=1] OR [a:b=2]" + ), + ( + "[a:b=1] AND ([a:b=2] OR [a:b=3])", + "([a:b=1] AND [a:b=2]) OR [a:b=3]" + ), + ( + "[a:b=1] WITHIN 2 SECONDS", + "[a:b=1] REPEATS 2 TIMES" + ) + ] +) +def test_obs_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +# # # # +# # Comparison expression equivalence tests # # +# # # # + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1 AND a:b=1]", + "[a:b=1]" + ), + ( + "[a:b=1 AND a:b=1 AND a:b=1]", + "[a:b=1]" + ), + ( + "[a:b=1 OR a:b=1]", + "[a:b=1]" + ), + ( + "[a:b=1 OR a:b=1 OR a:b=1]", + "[a:b=1]" + ) + ] +) +def test_comp_dupe_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[(a:b=1)]", + "[a:b=1]" + ), + ( + "[(((((a:b=1)))))]", + "[(a:b=1)]" + ), + ( + "[a:b=1 AND (a:b=2 AND a:b=3)]", + "[(a:b=1 AND a:b=2) AND a:b=3]" + ), + ( + "[a:b=1 OR (a:b=2 OR a:b=3)]", + "[(a:b=1 OR a:b=2) OR a:b=3]" + ), + ( + "[(((a:b=1 AND ((a:b=2) AND a:b=3) AND (a:b=4))))]", + "[a:b=1 AND a:b=2 AND a:b=3 AND a:b=4]" + ), + ( + "[(((a:b=1 OR ((a:b=2) OR a:b=3) OR (a:b=4))))]", + "[a:b=1 OR a:b=2 OR a:b=3 OR a:b=4]" + ), + ] +) +def test_comp_flatten_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1 AND a:b=2]", + "[a:b=2 AND a:b=1]" + ), + ( + "[a:b=1 OR a:b=2]", + "[a:b=2 OR a:b=1]" + ), + ( + "[(a:b=1 OR a:b=2) AND a:b=3]", + "[a:b=3 AND (a:b=2 OR a:b=1)]", + ) + ] +) +def test_comp_order_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1 OR (a:b=1 AND a:b=2)]", + "[a:b=1]" + ), + ( + "[a:b=1 AND (a:b=1 OR a:b=2)]", + "[a:b=1]" + ), + ( + "[(a:b=1 AND a:b=2) OR (a:b=3 AND a:b=2 AND a:b=1)]", + "[a:b=1 AND a:b=2]" + ), + ( + "[(a:b=1 OR a:b=2) AND (a:b=3 OR a:b=2 OR a:b=1)]", + "[a:b=1 OR a:b=2]" + ) + ] +) +def test_comp_absorb_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1 OR (a:b=2 AND a:b=3)]", + "[(a:b=1 OR a:b=2) AND (a:b=1 OR a:b=3)]" + ), + ( + "[a:b=1 AND (a:b=2 OR a:b=3)]", + "[(a:b=1 AND a:b=2) OR (a:b=1 AND a:b=3)]" + ), + ( + "[(a:b=1 AND a:b=2) OR (a:b=3 AND a:b=4)]", + "[(a:b=1 OR a:b=3) AND (a:b=1 OR a:b=4) AND (a:b=2 OR a:b=3) AND (a:b=2 OR a:b=4)]" + ), + ( + "[(a:b=1 OR a:b=2) AND (a:b=3 OR a:b=4)]", + "[(a:b=1 AND a:b=3) OR (a:b=1 AND a:b=4) OR (a:b=2 AND a:b=3) OR (a:b=2 AND a:b=4)]" + ), + ( + "[a:b=1 AND (a:b=2 AND (a:b=3 OR a:b=4))]", + "[(a:b=1 AND a:b=2 AND a:b=3) OR (a:b=1 AND a:b=2 AND a:b=4)]" + ) + ] +) +def test_comp_dnf_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1]", + "[a:b=2]" + ), + ( + "[a:b=1 AND a:b=2]", + "[a:b=1 OR a:b=2]" + ), + ( + "[(a:b=1 AND a:b=2) OR a:b=3]", + "[a:b=1 AND (a:b=2 OR a:b=3)]" + ), + ] +) +def test_comp_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[ipv4-addr:value='1.2.3.4/32']", + "[ipv4-addr:value='1.2.3.4']" + ), + ( + "[ipv4-addr:value='1.2.3.4/24']", + "[ipv4-addr:value='1.2.3.0/24']" + ), + ( + "[ipv4-addr:value='1.2.255.4/23']", + "[ipv4-addr:value='1.2.254.0/23']" + ), + ( + "[ipv4-addr:value='1.2.255.4/20']", + "[ipv4-addr:value='1.2.240.0/20']" + ), + ( + "[ipv4-addr:value='1.2.255.4/0']", + "[ipv4-addr:value='0.0.0.0/0']" + ), + ( + "[ipv4-addr:value='01.02.03.04']", + "[ipv4-addr:value='1.2.3.4']" + ), + ( + "[ipv4-addr:value='1.2.3.4/-5']", + "[ipv4-addr:value='1.2.3.4/-5']" + ), + ( + "[ipv4-addr:value='1.2.3.4/99']", + "[ipv4-addr:value='1.2.3.4/99']" + ), + ( + "[ipv4-addr:value='foo']", + "[ipv4-addr:value='foo']" + ), + ] +) +def test_comp_special_canonicalization_ipv4(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[ipv4-addr:value='1.2.3.4']", + "[ipv4-addr:value='1.2.3.5']" + ), + ( + "[ipv4-addr:value='1.2.3.4/1']", + "[ipv4-addr:value='1.2.3.4/2']" + ), + ( + "[ipv4-addr:value='foo']", + "[ipv4-addr:value='bar']" + ), + ] +) +def test_comp_special_canonicalization_ipv4_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/128']", + "[ipv6-addr:value='1:2:3:4:5:6:7:8']" + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/112']", + "[ipv6-addr:value='1:2:3:4:5:6:7:0/112']" + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:ffff:8/111']", + "[ipv6-addr:value='1:2:3:4:5:6:fffe:0/111']" + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:ffff:8/104']", + "[ipv6-addr:value='1:2:3:4:5:6:ff00:0/104']" + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/0']", + "[ipv6-addr:value='0:0:0:0:0:0:0:0/0']" + ), + ( + "[ipv6-addr:value='0001:0000:0000:0000:0000:0000:0000:0001']", + "[ipv6-addr:value='1::1']" + ), + ( + "[ipv6-addr:value='0000:0000:0000:0000:0000:0000:0000:0000']", + "[ipv6-addr:value='::']" + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/-5']", + "[ipv6-addr:value='1:2:3:4:5:6:7:8/-5']" + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/99']", + "[ipv6-addr:value='1:2:3:4:5:6:7:8/99']" + ), + ( + "[ipv6-addr:value='foo']", + "[ipv6-addr:value='foo']" + ), + ] +) +def test_comp_special_canonicalization_ipv6(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8']", + "[ipv6-addr:value='1:2:3:4:5:6:7:9']", + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/1']", + "[ipv6-addr:value='1:2:3:4:5:6:7:8/2']", + ), + ( + "[ipv6-addr:value='foo']", + "[ipv6-addr:value='bar']", + ), + ] +) +def test_comp_special_canonicalization_ipv6_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[windows-registry-key:key = 'aaa']", + "[windows-registry-key:key = 'AAA']", + ), + ( + "[windows-registry-key:values[0].name = 'aaa']", + "[windows-registry-key:values[0].name = 'AAA']", + ), + ( + "[windows-registry-key:values[*].name = 'aaa']", + "[windows-registry-key:values[*].name = 'AAA']", + ), + ] +) +def test_comp_special_canonicalization_win_reg_key(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[windows-registry-key:key='foo']", + "[windows-registry-key:key='bar']", + ), + ( + "[windows-registry-key:values[0].name='foo']", + "[windows-registry-key:values[0].name='bar']", + ), + ( + "[windows-registry-key:values[*].name='foo']", + "[windows-registry-key:values[*].name='bar']", + ), + ( + "[windows-registry-key:values[*].data='foo']", + "[windows-registry-key:values[*].data='FOO']", + ), + ] +) +def test_comp_special_canonicalization_win_reg_key_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) From 16a8c544ac3aba5fc6ab6fdf46c3c9f6db2670c8 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Thu, 13 Aug 2020 17:09:04 -0400 Subject: [PATCH 05/10] Add a find_equivalent_patterns() function and unit tests, in case a user wants a more efficient search capability. (It is more efficient than calling equivalent_patterns() over and over in a loop, because it doesn't repeatedly re-canonicalize the search pattern.) --- stix2/equivalence/patterns/__init__.py | 34 ++++++++++++++++++++++++ stix2/test/test_pattern_equivalence.py | 36 +++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/stix2/equivalence/patterns/__init__.py b/stix2/equivalence/patterns/__init__.py index 9965c35..0d0aa2a 100644 --- a/stix2/equivalence/patterns/__init__.py +++ b/stix2/equivalence/patterns/__init__.py @@ -70,3 +70,37 @@ def equivalent_patterns(pattern1, pattern2): result = observation_expression_cmp(canon_patt1, canon_patt2) return result == 0 + + +def find_equivalent_patterns(search_pattern, patterns): + """ + Find patterns from a sequence which are equivalent to a given pattern. + This is more efficient than using equivalent_patterns() in a loop, because + it doesn't re-canonicalize the search pattern over and over. This works + on an input iterable and is implemented as a generator of matches. So you + can "stream" patterns in and matching patterns will be streamed out. + + :param search_pattern: A search pattern as a string + :param patterns: An iterable over patterns as strings + :return: A generator iterator producing the semantically equivalent + patterns + """ + search_pattern_ast = stix2.pattern_visitor.create_pattern_object( + search_pattern + ) + + pattern_canonicalizer = _get_pattern_canonicalizer() + canon_search_pattern_ast, _ = pattern_canonicalizer.transform( + search_pattern_ast + ) + + for pattern in patterns: + pattern_ast = stix2.pattern_visitor.create_pattern_object(pattern) + canon_pattern_ast, _ = pattern_canonicalizer.transform(pattern_ast) + + result = observation_expression_cmp( + canon_search_pattern_ast, canon_pattern_ast + ) + + if result == 0: + yield pattern diff --git a/stix2/test/test_pattern_equivalence.py b/stix2/test/test_pattern_equivalence.py index 73eca58..0488358 100644 --- a/stix2/test/test_pattern_equivalence.py +++ b/stix2/test/test_pattern_equivalence.py @@ -1,5 +1,12 @@ import pytest -from stix2.equivalence.patterns import equivalent_patterns +from stix2.equivalence.patterns import ( + equivalent_patterns, find_equivalent_patterns +) + + +# # # # +# # Observation expression equivalence tests # # +# # # # @pytest.mark.parametrize( @@ -569,3 +576,30 @@ def test_comp_special_canonicalization_win_reg_key(patt1, patt2): ) def test_comp_special_canonicalization_win_reg_key_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) + + +# # # # +# # find_equivalent_patterns() tests # # +# # # # + +def test_find_equivalent_patterns(): + search_pattern = "[a:b=1]" + other_patterns = [ + "[a:b=2]", + "[a:b=1]", + "[a:b=1] WITHIN 1 SECONDS", + "[a:b=1] OR ([a:b=2] AND [a:b=1])", + "[(a:b=2 OR a:b=1) AND a:b=1]", + "[c:d=1]", + "[a:b>1]" + ] + + result = list( + find_equivalent_patterns(search_pattern, other_patterns) + ) + + assert result == [ + "[a:b=1]", + "[a:b=1] OR ([a:b=2] AND [a:b=1])", + "[(a:b=2 OR a:b=1) AND a:b=1]", + ] From c21b230edb78f0fa5e60704daac77b3d4ce93954 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Thu, 13 Aug 2020 17:44:42 -0400 Subject: [PATCH 06/10] pre-commit hook stylistic changes --- stix2/equivalence/patterns/__init__.py | 28 +-- .../equivalence/patterns/compare/__init__.py | 1 + .../patterns/compare/comparison.py | 28 +-- .../patterns/compare/observation.py | 29 ++- .../patterns/transform/__init__.py | 3 +- .../patterns/transform/comparison.py | 31 +-- .../patterns/transform/observation.py | 59 ++--- .../patterns/transform/specials.py | 6 +- stix2/pattern_visitor.py | 10 +- stix2/test/test_pattern_equivalence.py | 218 +++++++++--------- stix2/test/v21/test_pattern_expressions.py | 1 + 11 files changed, 211 insertions(+), 203 deletions(-) diff --git a/stix2/equivalence/patterns/__init__.py b/stix2/equivalence/patterns/__init__.py index 0d0aa2a..c371ca3 100644 --- a/stix2/equivalence/patterns/__init__.py +++ b/stix2/equivalence/patterns/__init__.py @@ -1,18 +1,14 @@ -import stix2.pattern_visitor -from stix2.equivalence.patterns.transform import ( - ChainTransformer, SettleTransformer -) from stix2.equivalence.patterns.compare.observation import ( - observation_expression_cmp + observation_expression_cmp, +) +from stix2.equivalence.patterns.transform import ( + ChainTransformer, SettleTransformer, ) from stix2.equivalence.patterns.transform.observation import ( - CanonicalizeComparisonExpressionsTransformer, - AbsorptionTransformer, - FlattenTransformer, - DNFTransformer, - OrderDedupeTransformer + AbsorptionTransformer, CanonicalizeComparisonExpressionsTransformer, + DNFTransformer, FlattenTransformer, OrderDedupeTransformer, ) - +import stix2.pattern_visitor # Lazy-initialize _pattern_canonicalizer = None @@ -38,7 +34,7 @@ def _get_pattern_canonicalizer(): obs_expr_order = OrderDedupeTransformer() obs_expr_absorb = AbsorptionTransformer() obs_simplify = ChainTransformer( - obs_expr_flatten, obs_expr_order, obs_expr_absorb + obs_expr_flatten, obs_expr_order, obs_expr_absorb, ) obs_settle_simplify = SettleTransformer(obs_simplify) @@ -46,7 +42,7 @@ def _get_pattern_canonicalizer(): _pattern_canonicalizer = ChainTransformer( canonicalize_comp_expr, - obs_settle_simplify, obs_dnf, obs_settle_simplify + obs_settle_simplify, obs_dnf, obs_settle_simplify, ) return _pattern_canonicalizer @@ -86,12 +82,12 @@ def find_equivalent_patterns(search_pattern, patterns): patterns """ search_pattern_ast = stix2.pattern_visitor.create_pattern_object( - search_pattern + search_pattern, ) pattern_canonicalizer = _get_pattern_canonicalizer() canon_search_pattern_ast, _ = pattern_canonicalizer.transform( - search_pattern_ast + search_pattern_ast, ) for pattern in patterns: @@ -99,7 +95,7 @@ def find_equivalent_patterns(search_pattern, patterns): canon_pattern_ast, _ = pattern_canonicalizer.transform(pattern_ast) result = observation_expression_cmp( - canon_search_pattern_ast, canon_pattern_ast + canon_search_pattern_ast, canon_pattern_ast, ) if result == 0: diff --git a/stix2/equivalence/patterns/compare/__init__.py b/stix2/equivalence/patterns/compare/__init__.py index a80de4f..e4bcc8f 100644 --- a/stix2/equivalence/patterns/compare/__init__.py +++ b/stix2/equivalence/patterns/compare/__init__.py @@ -2,6 +2,7 @@ Some generic comparison utility functions. """ + def generic_cmp(value1, value2): """ Generic comparator of values which uses the builtin '<' and '>' operators. diff --git a/stix2/equivalence/patterns/compare/comparison.py b/stix2/equivalence/patterns/compare/comparison.py index 03b16f4..ed717fc 100644 --- a/stix2/equivalence/patterns/compare/comparison.py +++ b/stix2/equivalence/patterns/compare/comparison.py @@ -3,18 +3,18 @@ Comparison utilities for STIX pattern comparison expressions. """ import base64 import functools -from stix2.patterns import ( - _ComparisonExpression, AndBooleanExpression, OrBooleanExpression, - ListObjectPathComponent, IntegerConstant, FloatConstant, StringConstant, - BooleanConstant, TimestampConstant, HexConstant, BinaryConstant, - ListConstant -) -from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp +from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp +from stix2.patterns import ( + AndBooleanExpression, BinaryConstant, BooleanConstant, FloatConstant, + HexConstant, IntegerConstant, ListConstant, ListObjectPathComponent, + OrBooleanExpression, StringConstant, TimestampConstant, + _ComparisonExpression, +) _COMPARISON_OP_ORDER = ( "=", "!=", "<>", "<", "<=", ">", ">=", - "IN", "LIKE", "MATCHES", "ISSUBSET", "ISSUPERSET" + "IN", "LIKE", "MATCHES", "ISSUBSET", "ISSUPERSET", ) @@ -23,7 +23,7 @@ _CONSTANT_TYPE_ORDER = ( # treated equally as a generic "number" type. So they aren't in this list. # See constant_cmp(). StringConstant, BooleanConstant, - TimestampConstant, HexConstant, BinaryConstant, ListConstant + TimestampConstant, HexConstant, BinaryConstant, ListConstant, ) @@ -111,11 +111,11 @@ def list_cmp(value1, value2): # Achieve order-independence by sorting the lists first. sorted_value1 = sorted( - value1.value, key=functools.cmp_to_key(constant_cmp) + value1.value, key=functools.cmp_to_key(constant_cmp), ) sorted_value2 = sorted( - value2.value, key=functools.cmp_to_key(constant_cmp) + value2.value, key=functools.cmp_to_key(constant_cmp), ) result = iter_lex_cmp(sorted_value1, sorted_value2, constant_cmp) @@ -131,7 +131,7 @@ _CONSTANT_COMPARATORS = { TimestampConstant: generic_constant_cmp, HexConstant: hex_cmp, BinaryConstant: bin_cmp, - ListConstant: list_cmp + ListConstant: list_cmp, } @@ -214,7 +214,7 @@ def object_path_cmp(path1, path2): path_vals1 = object_path_to_raw_values(path1) path_vals2 = object_path_to_raw_values(path2) result = iter_lex_cmp( - path_vals1, path_vals2, object_path_component_cmp + path_vals1, path_vals2, object_path_component_cmp, ) return result @@ -345,7 +345,7 @@ def comparison_expression_cmp(expr1, expr2): # This will order according to recursive invocations of this comparator, # on sub-expressions. result = iter_lex_cmp( - expr1.operands, expr2.operands, comparison_expression_cmp + expr1.operands, expr2.operands, comparison_expression_cmp, ) return result diff --git a/stix2/equivalence/patterns/compare/observation.py b/stix2/equivalence/patterns/compare/observation.py index 66513da..227b8ae 100644 --- a/stix2/equivalence/patterns/compare/observation.py +++ b/stix2/equivalence/patterns/compare/observation.py @@ -3,24 +3,23 @@ Comparison utilities for STIX pattern observation expressions. """ from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp from stix2.equivalence.patterns.compare.comparison import ( - comparison_expression_cmp, generic_constant_cmp + comparison_expression_cmp, generic_constant_cmp, ) from stix2.patterns import ( - ObservationExpression, AndObservationExpression, OrObservationExpression, - QualifiedObservationExpression, _CompoundObservationExpression, - RepeatQualifier, WithinQualifier, StartStopQualifier, - FollowedByObservationExpression + AndObservationExpression, FollowedByObservationExpression, + ObservationExpression, OrObservationExpression, + QualifiedObservationExpression, RepeatQualifier, StartStopQualifier, + WithinQualifier, _CompoundObservationExpression, ) - _OBSERVATION_EXPRESSION_TYPE_ORDER = ( ObservationExpression, AndObservationExpression, OrObservationExpression, - FollowedByObservationExpression, QualifiedObservationExpression + FollowedByObservationExpression, QualifiedObservationExpression, ) _QUALIFIER_TYPE_ORDER = ( - RepeatQualifier, WithinQualifier, StartStopQualifier + RepeatQualifier, WithinQualifier, StartStopQualifier, ) @@ -36,7 +35,7 @@ def within_cmp(qual1, qual2): Compare WITHIN qualifiers. This orders by number of seconds. """ return generic_constant_cmp( - qual1.number_of_seconds, qual2.number_of_seconds + qual1.number_of_seconds, qual2.number_of_seconds, ) @@ -48,14 +47,14 @@ def startstop_cmp(qual1, qual2): return iter_lex_cmp( (qual1.start_time, qual1.stop_time), (qual2.start_time, qual2.stop_time), - generic_constant_cmp + generic_constant_cmp, ) _QUALIFIER_COMPARATORS = { RepeatQualifier: repeats_cmp, WithinQualifier: within_cmp, - StartStopQualifier: startstop_cmp + StartStopQualifier: startstop_cmp, } @@ -84,14 +83,14 @@ def observation_expression_cmp(expr1, expr2): # If they're simple, use contained comparison expression order elif type1 is ObservationExpression: result = comparison_expression_cmp( - expr1.operand, expr2.operand + expr1.operand, expr2.operand, ) elif isinstance(expr1, _CompoundObservationExpression): # Both compound, and of same type (and/or/followedby): sort according # to contents. result = iter_lex_cmp( - expr1.operands, expr2.operands, observation_expression_cmp + expr1.operands, expr2.operands, observation_expression_cmp, ) else: # QualifiedObservationExpression @@ -112,13 +111,13 @@ def observation_expression_cmp(expr1, expr2): result = qual_cmp(expr1.qualifier, expr2.qualifier) else: raise TypeError( - "Can't compare qualifier type: " + qual1_type.__name__ + "Can't compare qualifier type: " + qual1_type.__name__, ) if result == 0: # Same qualifier type and details; use qualified expression order result = observation_expression_cmp( - expr1.observation_expression, expr2.observation_expression + expr1.observation_expression, expr2.observation_expression, ) return result diff --git a/stix2/equivalence/patterns/transform/__init__.py b/stix2/equivalence/patterns/transform/__init__.py index 5df9061..84a993c 100644 --- a/stix2/equivalence/patterns/transform/__init__.py +++ b/stix2/equivalence/patterns/transform/__init__.py @@ -2,6 +2,7 @@ Generic AST transformation classes. """ + class Transformer: """ Base class for AST transformers. @@ -16,7 +17,7 @@ class Transformer: is useful in situations where a transformation needs to be repeated until the AST stops changing. """ - raise NotImplemented("transform") + raise NotImplementedError("transform") class ChainTransformer(Transformer): diff --git a/stix2/equivalence/patterns/transform/comparison.py b/stix2/equivalence/patterns/transform/comparison.py index 2848598..528cc9b 100644 --- a/stix2/equivalence/patterns/transform/comparison.py +++ b/stix2/equivalence/patterns/transform/comparison.py @@ -3,18 +3,19 @@ Transformation utilities for STIX pattern comparison expressions. """ import functools import itertools + +from stix2.equivalence.patterns.compare import iter_in, iter_lex_cmp +from stix2.equivalence.patterns.compare.comparison import ( + comparison_expression_cmp, +) from stix2.equivalence.patterns.transform import Transformer from stix2.equivalence.patterns.transform.specials import ( - windows_reg_key, ipv4_addr, ipv6_addr + ipv4_addr, ipv6_addr, windows_reg_key, ) from stix2.patterns import ( - _BooleanExpression, _ComparisonExpression, AndBooleanExpression, - OrBooleanExpression, ParentheticalExpression + AndBooleanExpression, OrBooleanExpression, ParentheticalExpression, + _BooleanExpression, _ComparisonExpression, ) -from stix2.equivalence.patterns.compare.comparison import ( - comparison_expression_cmp -) -from stix2.equivalence.patterns.compare import iter_lex_cmp, iter_in def _dupe_ast(ast): @@ -119,7 +120,7 @@ class ComparisonExpressionTransformer(Transformer): elif isinstance(ast, _ComparisonExpression): meth = getattr( - self, "transform_comparison", self.transform_default + self, "transform_comparison", self.transform_default, ) else: @@ -156,7 +157,7 @@ class OrderDedupeTransformer( :return: The same AST node, but with sorted children """ sorted_children = sorted( - ast.operands, key=functools.cmp_to_key(comparison_expression_cmp) + ast.operands, key=functools.cmp_to_key(comparison_expression_cmp), ) deduped_children = [ @@ -165,13 +166,13 @@ class OrderDedupeTransformer( # need key wrappers in our ASTs! k.obj for k, _ in itertools.groupby( sorted_children, key=functools.cmp_to_key( - comparison_expression_cmp - ) + comparison_expression_cmp, + ), ) ] changed = iter_lex_cmp( - ast.operands, deduped_children, comparison_expression_cmp + ast.operands, deduped_children, comparison_expression_cmp, ) != 0 ast.operands = deduped_children @@ -268,7 +269,7 @@ class AbsorptionTransformer( # The simple check: is child1 contained in child2? if iter_in( - child1, child2.operands, comparison_expression_cmp + child1, child2.operands, comparison_expression_cmp, ): to_delete.add(j) @@ -278,7 +279,7 @@ class AbsorptionTransformer( if all( iter_in( child1_operand, child2.operands, - comparison_expression_cmp + comparison_expression_cmp, ) for child1_operand in child1.operands ): @@ -326,7 +327,7 @@ class DNFTransformer(ComparisonExpressionTransformer): # we should ensure each repetition is independent of the # others. _dupe_ast(sub_ast) for sub_ast in itertools.chain( - other_children, prod_seq + other_children, prod_seq, ) ]) for prod_seq in itertools.product(*or_children) diff --git a/stix2/equivalence/patterns/transform/observation.py b/stix2/equivalence/patterns/transform/observation.py index 4470706..d4ee175 100644 --- a/stix2/equivalence/patterns/transform/observation.py +++ b/stix2/equivalence/patterns/transform/observation.py @@ -3,23 +3,30 @@ Transformation utilities for STIX pattern observation expressions. """ import functools import itertools -from stix2.patterns import ( - ObservationExpression, AndObservationExpression, OrObservationExpression, - QualifiedObservationExpression, _CompoundObservationExpression, - ParentheticalExpression, FollowedByObservationExpression + +from stix2.equivalence.patterns.compare import iter_in, iter_lex_cmp +from stix2.equivalence.patterns.compare.observation import ( + observation_expression_cmp, ) from stix2.equivalence.patterns.transform import ( - ChainTransformer, SettleTransformer, Transformer + ChainTransformer, SettleTransformer, Transformer, ) from stix2.equivalence.patterns.transform.comparison import ( - FlattenTransformer as CFlattenTransformer, - OrderDedupeTransformer as COrderDedupeTransformer, - AbsorptionTransformer as CAbsorptionTransformer, - DNFTransformer as CDNFTransformer, - SpecialValueCanonicalization + SpecialValueCanonicalization, +) +from stix2.equivalence.patterns.transform.comparison import \ + AbsorptionTransformer as CAbsorptionTransformer +from stix2.equivalence.patterns.transform.comparison import \ + DNFTransformer as CDNFTransformer +from stix2.equivalence.patterns.transform.comparison import \ + FlattenTransformer as CFlattenTransformer +from stix2.equivalence.patterns.transform.comparison import \ + OrderDedupeTransformer as COrderDedupeTransformer +from stix2.patterns import ( + AndObservationExpression, FollowedByObservationExpression, + ObservationExpression, OrObservationExpression, ParentheticalExpression, + QualifiedObservationExpression, _CompoundObservationExpression, ) -from stix2.equivalence.patterns.compare import iter_lex_cmp, iter_in -from stix2.equivalence.patterns.compare.observation import observation_expression_cmp def _dupe_ast(ast): @@ -52,7 +59,7 @@ def _dupe_ast(ast): elif isinstance(ast, QualifiedObservationExpression): # Don't need to dupe the qualifier object at this point result = QualifiedObservationExpression( - _dupe_ast(ast.observation_expression), ast.qualifier + _dupe_ast(ast.observation_expression), ast.qualifier, ) elif isinstance(ast, ObservationExpression): @@ -100,7 +107,7 @@ class ObservationExpressionTransformer(Transformer): AndObservationExpression: "and", OrObservationExpression: "or", FollowedByObservationExpression: "followedby", - QualifiedObservationExpression: "qualified" + QualifiedObservationExpression: "qualified", } def transform(self, ast): @@ -143,7 +150,7 @@ class ObservationExpressionTransformer(Transformer): else: raise TypeError("Not an observation expression: {}: {}".format( - type(ast).__name__, str(ast) + type(ast).__name__, str(ast), )) return result, changed @@ -228,7 +235,7 @@ class OrderDedupeTransformer( def __transform(self, ast): sorted_children = sorted( - ast.operands, key=functools.cmp_to_key(observation_expression_cmp) + ast.operands, key=functools.cmp_to_key(observation_expression_cmp), ) # Deduping only applies to ORs @@ -236,15 +243,15 @@ class OrderDedupeTransformer( deduped_children = [ key.obj for key, _ in itertools.groupby( sorted_children, key=functools.cmp_to_key( - observation_expression_cmp - ) + observation_expression_cmp, + ), ) ] else: deduped_children = sorted_children changed = iter_lex_cmp( - ast.operands, deduped_children, observation_expression_cmp + ast.operands, deduped_children, observation_expression_cmp, ) != 0 ast.operands = deduped_children @@ -376,12 +383,12 @@ class AbsorptionTransformer( if isinstance( child2, ( AndObservationExpression, - FollowedByObservationExpression - ) + FollowedByObservationExpression, + ), ): # The simple check: is child1 contained in child2? if iter_in( - child1, child2.operands, observation_expression_cmp + child1, child2.operands, observation_expression_cmp, ): to_delete.add(j) @@ -390,11 +397,11 @@ class AbsorptionTransformer( elif type(child1) is type(child2): if isinstance(child1, AndObservationExpression): can_simplify = self.__is_contained_and( - child1.operands, child2.operands + child1.operands, child2.operands, ) else: # child1 and 2 are followedby nodes can_simplify = self.__is_contained_followedby( - child1.operands, child2.operands + child1.operands, child2.operands, ) if can_simplify: @@ -434,7 +441,7 @@ class DNFTransformer(ObservationExpressionTransformer): distributed_children = [ root_type([ _dupe_ast(sub_ast) for sub_ast in itertools.chain( - other_children, prod_seq + other_children, prod_seq, ) ]) for prod_seq in itertools.product(*or_children) @@ -477,7 +484,7 @@ class CanonicalizeComparisonExpressionsTransformer( comp_special = SpecialValueCanonicalization() comp_dnf = CDNFTransformer() self.__comp_canonicalize = ChainTransformer( - comp_special, settle_simplify, comp_dnf, settle_simplify + comp_special, settle_simplify, comp_dnf, settle_simplify, ) def transform_observation(self, ast): diff --git a/stix2/equivalence/patterns/transform/specials.py b/stix2/equivalence/patterns/transform/specials.py index 0eba091..b95e6bf 100644 --- a/stix2/equivalence/patterns/transform/specials.py +++ b/stix2/equivalence/patterns/transform/specials.py @@ -2,10 +2,10 @@ Some simple comparison expression canonicalization functions. """ import socket -from stix2.equivalence.patterns.compare.comparison import ( - object_path_to_raw_values -) +from stix2.equivalence.patterns.compare.comparison import ( + object_path_to_raw_values, +) # Values we can use as wildcards in path patterns _ANY_IDX = object() diff --git a/stix2/pattern_visitor.py b/stix2/pattern_visitor.py index a9d43c5..c4b2ec2 100644 --- a/stix2/pattern_visitor.py +++ b/stix2/pattern_visitor.py @@ -2,8 +2,8 @@ import importlib import inspect -from six import text_type +from six import text_type from stix2patterns.exceptions import ParseException from stix2patterns.grammars.STIXPatternParser import TerminalNode from stix2patterns.v20.grammars.STIXPatternParser import \ @@ -261,9 +261,11 @@ class STIXPatternVisitorForSTIX2(): property_path.append(self.instantiate("ListObjectPathComponent", current.property_name, next.getText())) i += 2 elif isinstance(next, IntegerConstant): - property_path.append(self.instantiate("ListObjectPathComponent", - current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current), - next.value)) + property_path.append(self.instantiate( + "ListObjectPathComponent", + current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current), + next.value, + )) i += 2 else: property_path.append(current) diff --git a/stix2/test/test_pattern_equivalence.py b/stix2/test/test_pattern_equivalence.py index 0488358..c75cc60 100644 --- a/stix2/test/test_pattern_equivalence.py +++ b/stix2/test/test_pattern_equivalence.py @@ -1,8 +1,8 @@ import pytest -from stix2.equivalence.patterns import ( - equivalent_patterns, find_equivalent_patterns -) +from stix2.equivalence.patterns import ( + equivalent_patterns, find_equivalent_patterns, +) # # # # # # Observation expression equivalence tests # # @@ -13,13 +13,13 @@ from stix2.equivalence.patterns import ( "patt1, patt2", [ ( "[a:b=1] OR [a:b=1]", - "[a:b=1]" + "[a:b=1]", ), ( "[a:b=1] OR [a:b=1] OR [a:b=1]", - "[a:b=1]" + "[a:b=1]", ), - ] + ], ) def test_obs_dupe_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -29,13 +29,13 @@ def test_obs_dupe_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1] AND [a:b=1]", - "[a:b=1]" + "[a:b=1]", ), ( "[a:b=1] FOLLOWEDBY [a:b=1]", - "[a:b=1]" + "[a:b=1]", ), - ] + ], ) def test_obs_dupe_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) @@ -72,8 +72,8 @@ def test_obs_dupe_not_equivalent(patt1, patt2): ( "[a:b=1] AND ([a:b=2] AND ([a:b=3] AND [a:b=4])) AND ([a:b=5])", "([a:b=1] AND ([a:b=2] AND [a:b=3]) AND ([a:b=4] AND [a:b=5]))", - ) - ] + ), + ], ) def test_obs_flatten_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -103,7 +103,7 @@ def test_obs_flatten_equivalent(patt1, patt2): "[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY [a:b=3]) WITHIN 2 SECONDS", "[a:b=1] WITHIN 2 SECONDS FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]", ), - ] + ], ) def test_obs_flatten_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) @@ -113,21 +113,21 @@ def test_obs_flatten_not_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1] AND [a:b=2]", - "[a:b=2] AND [a:b=1]" + "[a:b=2] AND [a:b=1]", ), ( "[a:b=1] OR [a:b=2]", - "[a:b=2] OR [a:b=1]" + "[a:b=2] OR [a:b=1]", ), ( "[a:b=1] OR ([a:b=2] AND [a:b=3])", - "([a:b=3] AND [a:b=2]) OR [a:b=1]" + "([a:b=3] AND [a:b=2]) OR [a:b=1]", ), ( "[a:b=1] WITHIN 2 SECONDS AND [a:b=2] REPEATS 2 TIMES", - "[a:b=2] REPEATS 2 TIMES AND [a:b=1] WITHIN 2 SECONDS" - ) - ] + "[a:b=2] REPEATS 2 TIMES AND [a:b=1] WITHIN 2 SECONDS", + ), + ], ) def test_obs_order_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -137,13 +137,13 @@ def test_obs_order_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1] FOLLOWEDBY [a:b=2]", - "[a:b=2] FOLLOWEDBY [a:b=1]" + "[a:b=2] FOLLOWEDBY [a:b=1]", ), ( "[a:b=1] WITHIN 2 SECONDS AND [a:b=2] REPEATS 2 TIMES", - "[a:b=1] REPEATS 2 TIMES AND [a:b=2] WITHIN 2 SECONDS" - ) - ] + "[a:b=1] REPEATS 2 TIMES AND [a:b=2] WITHIN 2 SECONDS", + ), + ], ) def test_obs_order_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) @@ -153,29 +153,29 @@ def test_obs_order_not_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1] OR ([a:b=1] AND [a:b=2])", - "[a:b=1]" + "[a:b=1]", ), ( "[a:b=1] OR ([a:b=1] FOLLOWEDBY [a:b=2])", - "[a:b=1]" + "[a:b=1]", ), ( "([a:b=3] AND [a:b=1]) OR ([a:b=1] AND [a:b=2] AND [a:b=3])", - "[a:b=3] AND [a:b=1]" + "[a:b=3] AND [a:b=1]", ), ( "([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=4] FOLLOWEDBY [a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3])", - "[a:b=1] FOLLOWEDBY [a:b=3]" + "[a:b=1] FOLLOWEDBY [a:b=3]", ), ( "([a:b=1] FOLLOWEDBY [a:b=2]) OR (([a:b=1] FOLLOWEDBY [a:b=2]) AND [a:b=3])", - "[a:b=1] FOLLOWEDBY [a:b=2]" + "[a:b=1] FOLLOWEDBY [a:b=2]", ), ( "([a:b=1] AND [a:b=2]) OR (([a:b=1] AND [a:b=2]) FOLLOWEDBY [a:b=3])", - "[a:b=1] AND [a:b=2]" + "[a:b=1] AND [a:b=2]", ), - ] + ], ) def test_obs_absorb_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -185,13 +185,13 @@ def test_obs_absorb_equivalent(patt1, patt2): "patt1, patt2", [ ( "([a:b=1] AND [a:b=2]) OR ([a:b=2] AND [a:b=3] AND [a:b=4])", - "[a:b=1] AND [a:b=2]" + "[a:b=1] AND [a:b=2]", ), ( "([a:b=2] FOLLOWEDBY [a:b=1]) OR ([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3])", - "[a:b=2] FOLLOWEDBY [a:b=1]" - ) - ] + "[a:b=2] FOLLOWEDBY [a:b=1]", + ), + ], ) def test_obs_absorb_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) @@ -201,29 +201,29 @@ def test_obs_absorb_not_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1] AND ([a:b=2] OR [a:b=3])", - "([a:b=1] AND [a:b=2]) OR ([a:b=1] AND [a:b=3])" + "([a:b=1] AND [a:b=2]) OR ([a:b=1] AND [a:b=3])", ), ( "[a:b=1] FOLLOWEDBY ([a:b=2] OR [a:b=3])", - "([a:b=1] FOLLOWEDBY [a:b=2]) OR ([a:b=1] FOLLOWEDBY [a:b=3])" + "([a:b=1] FOLLOWEDBY [a:b=2]) OR ([a:b=1] FOLLOWEDBY [a:b=3])", ), ( "[a:b=1] AND ([a:b=2] AND ([a:b=3] OR [a:b=4]))", - "([a:b=1] AND [a:b=2] AND [a:b=3]) OR ([a:b=1] AND [a:b=2] AND [a:b=4])" + "([a:b=1] AND [a:b=2] AND [a:b=3]) OR ([a:b=1] AND [a:b=2] AND [a:b=4])", ), ( "[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY ([a:b=3] OR [a:b=4]))", - "([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=4])" + "([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=4])", ), ( "([a:b=1] OR [a:b=2]) AND ([a:b=3] OR [a:b=4])", - "([a:b=1] AND [a:b=3]) OR ([a:b=1] AND [a:b=4]) OR ([a:b=2] AND [a:b=3]) OR ([a:b=2] AND [a:b=4])" + "([a:b=1] AND [a:b=3]) OR ([a:b=1] AND [a:b=4]) OR ([a:b=2] AND [a:b=3]) OR ([a:b=2] AND [a:b=4])", ), ( "([a:b=1] OR [a:b=2]) FOLLOWEDBY ([a:b=3] OR [a:b=4])", - "([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=4]) OR ([a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=2] FOLLOWEDBY [a:b=4])" + "([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=4]) OR ([a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=2] FOLLOWEDBY [a:b=4])", ), - ] + ], ) def test_obs_dnf_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -233,17 +233,17 @@ def test_obs_dnf_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1] AND [a:b=2]", - "[a:b=1] OR [a:b=2]" + "[a:b=1] OR [a:b=2]", ), ( "[a:b=1] AND ([a:b=2] OR [a:b=3])", - "([a:b=1] AND [a:b=2]) OR [a:b=3]" + "([a:b=1] AND [a:b=2]) OR [a:b=3]", ), ( "[a:b=1] WITHIN 2 SECONDS", - "[a:b=1] REPEATS 2 TIMES" - ) - ] + "[a:b=1] REPEATS 2 TIMES", + ), + ], ) def test_obs_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) @@ -258,21 +258,21 @@ def test_obs_not_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1 AND a:b=1]", - "[a:b=1]" + "[a:b=1]", ), ( "[a:b=1 AND a:b=1 AND a:b=1]", - "[a:b=1]" + "[a:b=1]", ), ( "[a:b=1 OR a:b=1]", - "[a:b=1]" + "[a:b=1]", ), ( "[a:b=1 OR a:b=1 OR a:b=1]", - "[a:b=1]" - ) - ] + "[a:b=1]", + ), + ], ) def test_comp_dupe_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -282,29 +282,29 @@ def test_comp_dupe_equivalent(patt1, patt2): "patt1, patt2", [ ( "[(a:b=1)]", - "[a:b=1]" + "[a:b=1]", ), ( "[(((((a:b=1)))))]", - "[(a:b=1)]" + "[(a:b=1)]", ), ( "[a:b=1 AND (a:b=2 AND a:b=3)]", - "[(a:b=1 AND a:b=2) AND a:b=3]" + "[(a:b=1 AND a:b=2) AND a:b=3]", ), ( "[a:b=1 OR (a:b=2 OR a:b=3)]", - "[(a:b=1 OR a:b=2) OR a:b=3]" + "[(a:b=1 OR a:b=2) OR a:b=3]", ), ( "[(((a:b=1 AND ((a:b=2) AND a:b=3) AND (a:b=4))))]", - "[a:b=1 AND a:b=2 AND a:b=3 AND a:b=4]" + "[a:b=1 AND a:b=2 AND a:b=3 AND a:b=4]", ), ( "[(((a:b=1 OR ((a:b=2) OR a:b=3) OR (a:b=4))))]", - "[a:b=1 OR a:b=2 OR a:b=3 OR a:b=4]" + "[a:b=1 OR a:b=2 OR a:b=3 OR a:b=4]", ), - ] + ], ) def test_comp_flatten_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -314,17 +314,17 @@ def test_comp_flatten_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1 AND a:b=2]", - "[a:b=2 AND a:b=1]" + "[a:b=2 AND a:b=1]", ), ( "[a:b=1 OR a:b=2]", - "[a:b=2 OR a:b=1]" + "[a:b=2 OR a:b=1]", ), ( "[(a:b=1 OR a:b=2) AND a:b=3]", "[a:b=3 AND (a:b=2 OR a:b=1)]", - ) - ] + ), + ], ) def test_comp_order_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -334,21 +334,21 @@ def test_comp_order_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1 OR (a:b=1 AND a:b=2)]", - "[a:b=1]" + "[a:b=1]", ), ( "[a:b=1 AND (a:b=1 OR a:b=2)]", - "[a:b=1]" + "[a:b=1]", ), ( "[(a:b=1 AND a:b=2) OR (a:b=3 AND a:b=2 AND a:b=1)]", - "[a:b=1 AND a:b=2]" + "[a:b=1 AND a:b=2]", ), ( "[(a:b=1 OR a:b=2) AND (a:b=3 OR a:b=2 OR a:b=1)]", - "[a:b=1 OR a:b=2]" - ) - ] + "[a:b=1 OR a:b=2]", + ), + ], ) def test_comp_absorb_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -358,25 +358,25 @@ def test_comp_absorb_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1 OR (a:b=2 AND a:b=3)]", - "[(a:b=1 OR a:b=2) AND (a:b=1 OR a:b=3)]" + "[(a:b=1 OR a:b=2) AND (a:b=1 OR a:b=3)]", ), ( "[a:b=1 AND (a:b=2 OR a:b=3)]", - "[(a:b=1 AND a:b=2) OR (a:b=1 AND a:b=3)]" + "[(a:b=1 AND a:b=2) OR (a:b=1 AND a:b=3)]", ), ( "[(a:b=1 AND a:b=2) OR (a:b=3 AND a:b=4)]", - "[(a:b=1 OR a:b=3) AND (a:b=1 OR a:b=4) AND (a:b=2 OR a:b=3) AND (a:b=2 OR a:b=4)]" + "[(a:b=1 OR a:b=3) AND (a:b=1 OR a:b=4) AND (a:b=2 OR a:b=3) AND (a:b=2 OR a:b=4)]", ), ( "[(a:b=1 OR a:b=2) AND (a:b=3 OR a:b=4)]", - "[(a:b=1 AND a:b=3) OR (a:b=1 AND a:b=4) OR (a:b=2 AND a:b=3) OR (a:b=2 AND a:b=4)]" + "[(a:b=1 AND a:b=3) OR (a:b=1 AND a:b=4) OR (a:b=2 AND a:b=3) OR (a:b=2 AND a:b=4)]", ), ( "[a:b=1 AND (a:b=2 AND (a:b=3 OR a:b=4))]", - "[(a:b=1 AND a:b=2 AND a:b=3) OR (a:b=1 AND a:b=2 AND a:b=4)]" - ) - ] + "[(a:b=1 AND a:b=2 AND a:b=3) OR (a:b=1 AND a:b=2 AND a:b=4)]", + ), + ], ) def test_comp_dnf_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -386,17 +386,17 @@ def test_comp_dnf_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b=1]", - "[a:b=2]" + "[a:b=2]", ), ( "[a:b=1 AND a:b=2]", - "[a:b=1 OR a:b=2]" + "[a:b=1 OR a:b=2]", ), ( "[(a:b=1 AND a:b=2) OR a:b=3]", - "[a:b=1 AND (a:b=2 OR a:b=3)]" + "[a:b=1 AND (a:b=2 OR a:b=3)]", ), - ] + ], ) def test_comp_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) @@ -406,41 +406,41 @@ def test_comp_not_equivalent(patt1, patt2): "patt1, patt2", [ ( "[ipv4-addr:value='1.2.3.4/32']", - "[ipv4-addr:value='1.2.3.4']" + "[ipv4-addr:value='1.2.3.4']", ), ( "[ipv4-addr:value='1.2.3.4/24']", - "[ipv4-addr:value='1.2.3.0/24']" + "[ipv4-addr:value='1.2.3.0/24']", ), ( "[ipv4-addr:value='1.2.255.4/23']", - "[ipv4-addr:value='1.2.254.0/23']" + "[ipv4-addr:value='1.2.254.0/23']", ), ( "[ipv4-addr:value='1.2.255.4/20']", - "[ipv4-addr:value='1.2.240.0/20']" + "[ipv4-addr:value='1.2.240.0/20']", ), ( "[ipv4-addr:value='1.2.255.4/0']", - "[ipv4-addr:value='0.0.0.0/0']" + "[ipv4-addr:value='0.0.0.0/0']", ), ( "[ipv4-addr:value='01.02.03.04']", - "[ipv4-addr:value='1.2.3.4']" + "[ipv4-addr:value='1.2.3.4']", ), ( "[ipv4-addr:value='1.2.3.4/-5']", - "[ipv4-addr:value='1.2.3.4/-5']" + "[ipv4-addr:value='1.2.3.4/-5']", ), ( "[ipv4-addr:value='1.2.3.4/99']", - "[ipv4-addr:value='1.2.3.4/99']" + "[ipv4-addr:value='1.2.3.4/99']", ), ( "[ipv4-addr:value='foo']", - "[ipv4-addr:value='foo']" + "[ipv4-addr:value='foo']", ), - ] + ], ) def test_comp_special_canonicalization_ipv4(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -450,17 +450,17 @@ def test_comp_special_canonicalization_ipv4(patt1, patt2): "patt1, patt2", [ ( "[ipv4-addr:value='1.2.3.4']", - "[ipv4-addr:value='1.2.3.5']" + "[ipv4-addr:value='1.2.3.5']", ), ( "[ipv4-addr:value='1.2.3.4/1']", - "[ipv4-addr:value='1.2.3.4/2']" + "[ipv4-addr:value='1.2.3.4/2']", ), ( "[ipv4-addr:value='foo']", - "[ipv4-addr:value='bar']" + "[ipv4-addr:value='bar']", ), - ] + ], ) def test_comp_special_canonicalization_ipv4_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) @@ -470,45 +470,45 @@ def test_comp_special_canonicalization_ipv4_not_equivalent(patt1, patt2): "patt1, patt2", [ ( "[ipv6-addr:value='1:2:3:4:5:6:7:8/128']", - "[ipv6-addr:value='1:2:3:4:5:6:7:8']" + "[ipv6-addr:value='1:2:3:4:5:6:7:8']", ), ( "[ipv6-addr:value='1:2:3:4:5:6:7:8/112']", - "[ipv6-addr:value='1:2:3:4:5:6:7:0/112']" + "[ipv6-addr:value='1:2:3:4:5:6:7:0/112']", ), ( "[ipv6-addr:value='1:2:3:4:5:6:ffff:8/111']", - "[ipv6-addr:value='1:2:3:4:5:6:fffe:0/111']" + "[ipv6-addr:value='1:2:3:4:5:6:fffe:0/111']", ), ( "[ipv6-addr:value='1:2:3:4:5:6:ffff:8/104']", - "[ipv6-addr:value='1:2:3:4:5:6:ff00:0/104']" + "[ipv6-addr:value='1:2:3:4:5:6:ff00:0/104']", ), ( "[ipv6-addr:value='1:2:3:4:5:6:7:8/0']", - "[ipv6-addr:value='0:0:0:0:0:0:0:0/0']" + "[ipv6-addr:value='0:0:0:0:0:0:0:0/0']", ), ( "[ipv6-addr:value='0001:0000:0000:0000:0000:0000:0000:0001']", - "[ipv6-addr:value='1::1']" + "[ipv6-addr:value='1::1']", ), ( "[ipv6-addr:value='0000:0000:0000:0000:0000:0000:0000:0000']", - "[ipv6-addr:value='::']" + "[ipv6-addr:value='::']", ), ( "[ipv6-addr:value='1:2:3:4:5:6:7:8/-5']", - "[ipv6-addr:value='1:2:3:4:5:6:7:8/-5']" + "[ipv6-addr:value='1:2:3:4:5:6:7:8/-5']", ), ( "[ipv6-addr:value='1:2:3:4:5:6:7:8/99']", - "[ipv6-addr:value='1:2:3:4:5:6:7:8/99']" + "[ipv6-addr:value='1:2:3:4:5:6:7:8/99']", ), ( "[ipv6-addr:value='foo']", - "[ipv6-addr:value='foo']" + "[ipv6-addr:value='foo']", ), - ] + ], ) def test_comp_special_canonicalization_ipv6(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -528,7 +528,7 @@ def test_comp_special_canonicalization_ipv6(patt1, patt2): "[ipv6-addr:value='foo']", "[ipv6-addr:value='bar']", ), - ] + ], ) def test_comp_special_canonicalization_ipv6_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) @@ -548,7 +548,7 @@ def test_comp_special_canonicalization_ipv6_not_equivalent(patt1, patt2): "[windows-registry-key:values[*].name = 'aaa']", "[windows-registry-key:values[*].name = 'AAA']", ), - ] + ], ) def test_comp_special_canonicalization_win_reg_key(patt1, patt2): assert equivalent_patterns(patt1, patt2) @@ -572,7 +572,7 @@ def test_comp_special_canonicalization_win_reg_key(patt1, patt2): "[windows-registry-key:values[*].data='foo']", "[windows-registry-key:values[*].data='FOO']", ), - ] + ], ) def test_comp_special_canonicalization_win_reg_key_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) @@ -591,11 +591,11 @@ def test_find_equivalent_patterns(): "[a:b=1] OR ([a:b=2] AND [a:b=1])", "[(a:b=2 OR a:b=1) AND a:b=1]", "[c:d=1]", - "[a:b>1]" + "[a:b>1]", ] result = list( - find_equivalent_patterns(search_pattern, other_patterns) + find_equivalent_patterns(search_pattern, other_patterns), ) assert result == [ diff --git a/stix2/test/v21/test_pattern_expressions.py b/stix2/test/v21/test_pattern_expressions.py index 3ba0aa6..ac6a439 100644 --- a/stix2/test/v21/test_pattern_expressions.py +++ b/stix2/test/v21/test_pattern_expressions.py @@ -658,6 +658,7 @@ def test_parsing_integer_index(): patt_obj = create_pattern_object("[a:b[1]=2]") assert str(patt_obj) == "[a:b[1] = 2]" + # This should never occur, because the first component will always be a property_name, and they should not be quoted. def test_parsing_quoted_first_path_component(): patt_obj = create_pattern_object("[a:'b'[1]=2]") From 320129e26c4125301b90ae44ccf8210067422591 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Thu, 13 Aug 2020 18:45:52 -0400 Subject: [PATCH 07/10] Add another unit test to help a bit with lack of coverage of compare/comparison.py. This one tests patterns with more constant types. --- stix2/test/test_pattern_equivalence.py | 29 ++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/stix2/test/test_pattern_equivalence.py b/stix2/test/test_pattern_equivalence.py index c75cc60..6fc2adf 100644 --- a/stix2/test/test_pattern_equivalence.py +++ b/stix2/test/test_pattern_equivalence.py @@ -578,6 +578,35 @@ def test_comp_special_canonicalization_win_reg_key_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2) +def test_comp_other_constant_types(): + constants = [ + "1.23", + "1", + "true", + "false", + "h'4fa2'", + "b'ZmpoZWll'", + "t'1982-12-31T02:14:17.232Z'", + ] + + pattern_template = "[a:b={}]" + for i, const1 in enumerate(constants): + for j, const2 in enumerate(constants): + patt1 = pattern_template.format(const1) + patt2 = pattern_template.format(const2) + + if i == j: + assert equivalent_patterns(patt1, patt2) + else: + assert not equivalent_patterns(patt1, patt2) + + # can't use an "=" pattern with lists... + for const in constants: + patt1 = "[a:b={}]".format(const) + patt2 = "[a:b IN (1,2,3)]" + assert not equivalent_patterns(patt1, patt2) + + # # # # # # find_equivalent_patterns() tests # # # # # # From 9e707a3a81007ff697918e16cb56e7c65f1fb827 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Fri, 14 Aug 2020 19:55:00 -0400 Subject: [PATCH 08/10] Add stix_version kwargs to the pattern equivalence functions. This allows the patterns to be parsed using either 2.0 or 2.1+ syntax. --- stix2/equivalence/patterns/__init__.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/stix2/equivalence/patterns/__init__.py b/stix2/equivalence/patterns/__init__.py index c371ca3..c792574 100644 --- a/stix2/equivalence/patterns/__init__.py +++ b/stix2/equivalence/patterns/__init__.py @@ -1,3 +1,4 @@ +import stix2 from stix2.equivalence.patterns.compare.observation import ( observation_expression_cmp, ) @@ -48,16 +49,22 @@ def _get_pattern_canonicalizer(): return _pattern_canonicalizer -def equivalent_patterns(pattern1, pattern2): +def equivalent_patterns(pattern1, pattern2, stix_version=stix2.DEFAULT_VERSION): """ Determine whether two STIX patterns are semantically equivalent. :param pattern1: The first STIX pattern :param pattern2: The second STIX pattern + :param stix_version: The STIX version to use for pattern parsing, as a + string ("2.0", "2.1", etc). Defaults to library-wide default version. :return: True if the patterns are semantically equivalent; False if not """ - patt_ast1 = stix2.pattern_visitor.create_pattern_object(pattern1) - patt_ast2 = stix2.pattern_visitor.create_pattern_object(pattern2) + patt_ast1 = stix2.pattern_visitor.create_pattern_object( + pattern1, version=stix_version, + ) + patt_ast2 = stix2.pattern_visitor.create_pattern_object( + pattern2, version=stix_version, + ) pattern_canonicalizer = _get_pattern_canonicalizer() canon_patt1, _ = pattern_canonicalizer.transform(patt_ast1) @@ -68,7 +75,9 @@ def equivalent_patterns(pattern1, pattern2): return result == 0 -def find_equivalent_patterns(search_pattern, patterns): +def find_equivalent_patterns( + search_pattern, patterns, stix_version=stix2.DEFAULT_VERSION, +): """ Find patterns from a sequence which are equivalent to a given pattern. This is more efficient than using equivalent_patterns() in a loop, because @@ -78,11 +87,13 @@ def find_equivalent_patterns(search_pattern, patterns): :param search_pattern: A search pattern as a string :param patterns: An iterable over patterns as strings + :param stix_version: The STIX version to use for pattern parsing, as a + string ("2.0", "2.1", etc). Defaults to library-wide default version. :return: A generator iterator producing the semantically equivalent patterns """ search_pattern_ast = stix2.pattern_visitor.create_pattern_object( - search_pattern, + search_pattern, version=stix_version, ) pattern_canonicalizer = _get_pattern_canonicalizer() @@ -91,7 +102,9 @@ def find_equivalent_patterns(search_pattern, patterns): ) for pattern in patterns: - pattern_ast = stix2.pattern_visitor.create_pattern_object(pattern) + pattern_ast = stix2.pattern_visitor.create_pattern_object( + pattern, version=stix_version, + ) canon_pattern_ast, _ = pattern_canonicalizer.transform(pattern_ast) result = observation_expression_cmp( From b6c220649189de17a529d9f185aeb325705d8d72 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Fri, 14 Aug 2020 19:56:49 -0400 Subject: [PATCH 09/10] Add some unit test suites for pattern equivalence which use some STIX version-specific pattern features. --- stix2/test/v20/test_pattern_equivalence.py | 46 ++++++++++++++++++++++ stix2/test/v21/test_pattern_equivalence.py | 46 ++++++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 stix2/test/v20/test_pattern_equivalence.py create mode 100644 stix2/test/v21/test_pattern_equivalence.py diff --git a/stix2/test/v20/test_pattern_equivalence.py b/stix2/test/v20/test_pattern_equivalence.py new file mode 100644 index 0000000..bf50e95 --- /dev/null +++ b/stix2/test/v20/test_pattern_equivalence.py @@ -0,0 +1,46 @@ +""" +Pattern equivalence unit tests which use STIX 2.0-specific pattern features +""" + +import pytest +from stix2.equivalence.patterns import equivalent_patterns + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", + "[a:b=1 OR (a:c=2 AND a:b=1)] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'" + ), + ( + "[a:b=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z' WITHIN 2 SECONDS", + "[a:b=1 OR (a:c=2 AND a:b=1)] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z' WITHIN 2 SECONDS" + ), + ( + "[a:b=1] REPEATS 2 TIMES REPEATS 2 TIMES", + "([a:b=1] REPEATS 2 TIMES) REPEATS 2 TIMES", + ) + ] +) +def test_startstop_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2, stix_version="2.0") + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b!=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", + "[a:b!=1] START '1977-09-29T07:41:03Z' STOP '1996-09-18T22:46:07Z'" + ), + ( + "[a:b<1] REPEATS 2 TIMES START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", + "[a:b<1] REPEATS 2 TIMES START '1977-09-29T07:41:03Z' STOP '1996-09-18T22:46:07Z'" + ), + ( + "[a:b=1] REPEATS 2 TIMES REPEATS 2 TIMES", + "([a:b=1] REPEATS 2 TIMES) REPEATS 3 TIMES", + ) + ] +) +def test_startstop_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2, stix_version="2.0") diff --git a/stix2/test/v21/test_pattern_equivalence.py b/stix2/test/v21/test_pattern_equivalence.py new file mode 100644 index 0000000..e7bd5b4 --- /dev/null +++ b/stix2/test/v21/test_pattern_equivalence.py @@ -0,0 +1,46 @@ +""" +Pattern equivalence unit tests which use STIX 2.1+-specific pattern features +""" + +import pytest +from stix2.equivalence.patterns import equivalent_patterns + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", + "[a:b=1 OR (a:c=2 AND a:b=1)] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'" + ), + ( + "[a:b=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z' WITHIN 2 SECONDS", + "[a:b=1 OR (a:c=2 AND a:b=1)] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z' WITHIN 2 SECONDS" + ), + ( + "([a:b=1]) REPEATS 2 TIMES REPEATS 2 TIMES", + "([a:b=1] REPEATS 2 TIMES) REPEATS 2 TIMES", + ) + ] +) +def test_startstop_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2, stix_version="2.1") + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b!=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", + "[a:b!=1] START t'1977-09-29T07:41:03Z' STOP t'1996-09-18T22:46:07Z'" + ), + ( + "[a:b<1] REPEATS 2 TIMES START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", + "[a:b<1] REPEATS 2 TIMES START t'1977-09-29T07:41:03Z' STOP t'1996-09-18T22:46:07Z'" + ), + ( + "([a:b=1]) REPEATS 2 TIMES REPEATS 2 TIMES", + "([a:b=1] REPEATS 2 TIMES) REPEATS 3 TIMES", + ) + ] +) +def test_startstop_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2, stix_version="2.1") From b5015b74ba3add1818bee6de433a07469f9b39ba Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Wed, 19 Aug 2020 12:10:51 -0400 Subject: [PATCH 10/10] pre-commit stylistic fixes --- stix2/test/v20/test_pattern_equivalence.py | 17 +++++++++-------- stix2/test/v21/test_pattern_equivalence.py | 17 +++++++++-------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/stix2/test/v20/test_pattern_equivalence.py b/stix2/test/v20/test_pattern_equivalence.py index bf50e95..1ada5c7 100644 --- a/stix2/test/v20/test_pattern_equivalence.py +++ b/stix2/test/v20/test_pattern_equivalence.py @@ -3,6 +3,7 @@ Pattern equivalence unit tests which use STIX 2.0-specific pattern features """ import pytest + from stix2.equivalence.patterns import equivalent_patterns @@ -10,17 +11,17 @@ from stix2.equivalence.patterns import equivalent_patterns "patt1, patt2", [ ( "[a:b=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", - "[a:b=1 OR (a:c=2 AND a:b=1)] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'" + "[a:b=1 OR (a:c=2 AND a:b=1)] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", ), ( "[a:b=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z' WITHIN 2 SECONDS", - "[a:b=1 OR (a:c=2 AND a:b=1)] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z' WITHIN 2 SECONDS" + "[a:b=1 OR (a:c=2 AND a:b=1)] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z' WITHIN 2 SECONDS", ), ( "[a:b=1] REPEATS 2 TIMES REPEATS 2 TIMES", "([a:b=1] REPEATS 2 TIMES) REPEATS 2 TIMES", - ) - ] + ), + ], ) def test_startstop_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2, stix_version="2.0") @@ -30,17 +31,17 @@ def test_startstop_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b!=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", - "[a:b!=1] START '1977-09-29T07:41:03Z' STOP '1996-09-18T22:46:07Z'" + "[a:b!=1] START '1977-09-29T07:41:03Z' STOP '1996-09-18T22:46:07Z'", ), ( "[a:b<1] REPEATS 2 TIMES START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", - "[a:b<1] REPEATS 2 TIMES START '1977-09-29T07:41:03Z' STOP '1996-09-18T22:46:07Z'" + "[a:b<1] REPEATS 2 TIMES START '1977-09-29T07:41:03Z' STOP '1996-09-18T22:46:07Z'", ), ( "[a:b=1] REPEATS 2 TIMES REPEATS 2 TIMES", "([a:b=1] REPEATS 2 TIMES) REPEATS 3 TIMES", - ) - ] + ), + ], ) def test_startstop_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2, stix_version="2.0") diff --git a/stix2/test/v21/test_pattern_equivalence.py b/stix2/test/v21/test_pattern_equivalence.py index e7bd5b4..71ded69 100644 --- a/stix2/test/v21/test_pattern_equivalence.py +++ b/stix2/test/v21/test_pattern_equivalence.py @@ -3,6 +3,7 @@ Pattern equivalence unit tests which use STIX 2.1+-specific pattern features """ import pytest + from stix2.equivalence.patterns import equivalent_patterns @@ -10,17 +11,17 @@ from stix2.equivalence.patterns import equivalent_patterns "patt1, patt2", [ ( "[a:b=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", - "[a:b=1 OR (a:c=2 AND a:b=1)] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'" + "[a:b=1 OR (a:c=2 AND a:b=1)] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", ), ( "[a:b=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z' WITHIN 2 SECONDS", - "[a:b=1 OR (a:c=2 AND a:b=1)] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z' WITHIN 2 SECONDS" + "[a:b=1 OR (a:c=2 AND a:b=1)] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z' WITHIN 2 SECONDS", ), ( "([a:b=1]) REPEATS 2 TIMES REPEATS 2 TIMES", "([a:b=1] REPEATS 2 TIMES) REPEATS 2 TIMES", - ) - ] + ), + ], ) def test_startstop_equivalent(patt1, patt2): assert equivalent_patterns(patt1, patt2, stix_version="2.1") @@ -30,17 +31,17 @@ def test_startstop_equivalent(patt1, patt2): "patt1, patt2", [ ( "[a:b!=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", - "[a:b!=1] START t'1977-09-29T07:41:03Z' STOP t'1996-09-18T22:46:07Z'" + "[a:b!=1] START t'1977-09-29T07:41:03Z' STOP t'1996-09-18T22:46:07Z'", ), ( "[a:b<1] REPEATS 2 TIMES START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", - "[a:b<1] REPEATS 2 TIMES START t'1977-09-29T07:41:03Z' STOP t'1996-09-18T22:46:07Z'" + "[a:b<1] REPEATS 2 TIMES START t'1977-09-29T07:41:03Z' STOP t'1996-09-18T22:46:07Z'", ), ( "([a:b=1]) REPEATS 2 TIMES REPEATS 2 TIMES", "([a:b=1] REPEATS 2 TIMES) REPEATS 3 TIMES", - ) - ] + ), + ], ) def test_startstop_not_equivalent(patt1, patt2): assert not equivalent_patterns(patt1, patt2, stix_version="2.1")