diff --git a/requirements.txt b/requirements.txt index 2fb7c5d..8a5e4a2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ bumpversion ipython nbsphinx==0.4.3 pre-commit +pygments<3,>=2.4.1 pytest pytest-cov sphinx<2 diff --git a/stix2/equivalence/__init__.py b/stix2/equivalence/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stix2/equivalence/patterns/__init__.py b/stix2/equivalence/patterns/__init__.py new file mode 100644 index 0000000..c792574 --- /dev/null +++ b/stix2/equivalence/patterns/__init__.py @@ -0,0 +1,115 @@ +import stix2 +from stix2.equivalence.patterns.compare.observation import ( + observation_expression_cmp, +) +from stix2.equivalence.patterns.transform import ( + ChainTransformer, SettleTransformer, +) +from stix2.equivalence.patterns.transform.observation import ( + AbsorptionTransformer, CanonicalizeComparisonExpressionsTransformer, + DNFTransformer, FlattenTransformer, OrderDedupeTransformer, +) +import stix2.pattern_visitor + +# Lazy-initialize +_pattern_canonicalizer = None + + +def _get_pattern_canonicalizer(): + """ + Get a canonicalization transformer for STIX patterns. + + :return: The transformer + """ + + # The transformers are either stateless or contain no state which changes + # with each use. So we can setup the transformers once and keep reusing + # them. + global _pattern_canonicalizer + + if not _pattern_canonicalizer: + canonicalize_comp_expr = \ + CanonicalizeComparisonExpressionsTransformer() + + obs_expr_flatten = FlattenTransformer() + obs_expr_order = OrderDedupeTransformer() + obs_expr_absorb = AbsorptionTransformer() + obs_simplify = ChainTransformer( + obs_expr_flatten, obs_expr_order, obs_expr_absorb, + ) + obs_settle_simplify = SettleTransformer(obs_simplify) + + obs_dnf = DNFTransformer() + + _pattern_canonicalizer = ChainTransformer( + canonicalize_comp_expr, + obs_settle_simplify, obs_dnf, obs_settle_simplify, + ) + + return _pattern_canonicalizer + + +def equivalent_patterns(pattern1, pattern2, stix_version=stix2.DEFAULT_VERSION): + """ + Determine whether two STIX patterns are semantically equivalent. + + :param pattern1: The first STIX pattern + :param pattern2: The second STIX pattern + :param stix_version: The STIX version to use for pattern parsing, as a + string ("2.0", "2.1", etc). Defaults to library-wide default version. + :return: True if the patterns are semantically equivalent; False if not + """ + patt_ast1 = stix2.pattern_visitor.create_pattern_object( + pattern1, version=stix_version, + ) + patt_ast2 = stix2.pattern_visitor.create_pattern_object( + pattern2, version=stix_version, + ) + + pattern_canonicalizer = _get_pattern_canonicalizer() + canon_patt1, _ = pattern_canonicalizer.transform(patt_ast1) + canon_patt2, _ = pattern_canonicalizer.transform(patt_ast2) + + result = observation_expression_cmp(canon_patt1, canon_patt2) + + return result == 0 + + +def find_equivalent_patterns( + search_pattern, patterns, stix_version=stix2.DEFAULT_VERSION, +): + """ + Find patterns from a sequence which are equivalent to a given pattern. + This is more efficient than using equivalent_patterns() in a loop, because + it doesn't re-canonicalize the search pattern over and over. This works + on an input iterable and is implemented as a generator of matches. So you + can "stream" patterns in and matching patterns will be streamed out. + + :param search_pattern: A search pattern as a string + :param patterns: An iterable over patterns as strings + :param stix_version: The STIX version to use for pattern parsing, as a + string ("2.0", "2.1", etc). Defaults to library-wide default version. + :return: A generator iterator producing the semantically equivalent + patterns + """ + search_pattern_ast = stix2.pattern_visitor.create_pattern_object( + search_pattern, version=stix_version, + ) + + pattern_canonicalizer = _get_pattern_canonicalizer() + canon_search_pattern_ast, _ = pattern_canonicalizer.transform( + search_pattern_ast, + ) + + for pattern in patterns: + pattern_ast = stix2.pattern_visitor.create_pattern_object( + pattern, version=stix_version, + ) + canon_pattern_ast, _ = pattern_canonicalizer.transform(pattern_ast) + + result = observation_expression_cmp( + canon_search_pattern_ast, canon_pattern_ast, + ) + + if result == 0: + yield pattern diff --git a/stix2/equivalence/patterns/compare/__init__.py b/stix2/equivalence/patterns/compare/__init__.py new file mode 100644 index 0000000..e4bcc8f --- /dev/null +++ b/stix2/equivalence/patterns/compare/__init__.py @@ -0,0 +1,91 @@ +""" +Some generic comparison utility functions. +""" + + +def generic_cmp(value1, value2): + """ + Generic comparator of values which uses the builtin '<' and '>' operators. + Assumes the values can be compared that way. + + :param value1: The first value + :param value2: The second value + :return: -1, 0, or 1 depending on whether value1 is less, equal, or greater + than value2 + """ + + return -1 if value1 < value2 else 1 if value1 > value2 else 0 + + +def iter_lex_cmp(seq1, seq2, cmp): + """ + Generic lexicographical compare function, which works on two iterables and + a comparator function. + + :param seq1: The first iterable + :param seq2: The second iterable + :param cmp: a two-arg callable comparator for values iterated over. It + must behave analogously to this function, returning <0, 0, or >0 to + express the ordering of the two values. + :return: <0 if seq1 < seq2; >0 if seq1 > seq2; 0 if they're equal + """ + + it1 = iter(seq1) + it2 = iter(seq2) + + it1_exhausted = it2_exhausted = False + while True: + try: + val1 = next(it1) + except StopIteration: + it1_exhausted = True + + try: + val2 = next(it2) + except StopIteration: + it2_exhausted = True + + # same length, all elements equal + if it1_exhausted and it2_exhausted: + result = 0 + break + + # one is a prefix of the other; the shorter one is less + elif it1_exhausted: + result = -1 + break + + elif it2_exhausted: + result = 1 + break + + # neither is exhausted; check values + else: + val_cmp = cmp(val1, val2) + + if val_cmp != 0: + result = val_cmp + break + + return result + + +def iter_in(value, seq, cmp): + """ + A function behaving like the "in" Python operator, but which works with a + a comparator function. This function checks whether the given value is + contained in the given iterable. + + :param value: A value + :param seq: An iterable + :param cmp: A 2-arg comparator function which must return 0 if the args + are equal + :return: True if the value is found in the iterable, False if it is not + """ + result = False + for seq_val in seq: + if cmp(value, seq_val) == 0: + result = True + break + + return result diff --git a/stix2/equivalence/patterns/compare/comparison.py b/stix2/equivalence/patterns/compare/comparison.py new file mode 100644 index 0000000..ed717fc --- /dev/null +++ b/stix2/equivalence/patterns/compare/comparison.py @@ -0,0 +1,351 @@ +""" +Comparison utilities for STIX pattern comparison expressions. +""" +import base64 +import functools + +from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp +from stix2.patterns import ( + AndBooleanExpression, BinaryConstant, BooleanConstant, FloatConstant, + HexConstant, IntegerConstant, ListConstant, ListObjectPathComponent, + OrBooleanExpression, StringConstant, TimestampConstant, + _ComparisonExpression, +) + +_COMPARISON_OP_ORDER = ( + "=", "!=", "<>", "<", "<=", ">", ">=", + "IN", "LIKE", "MATCHES", "ISSUBSET", "ISSUPERSET", +) + + +_CONSTANT_TYPE_ORDER = ( + # ints/floats come first, but have special handling since the types are + # treated equally as a generic "number" type. So they aren't in this list. + # See constant_cmp(). + StringConstant, BooleanConstant, + TimestampConstant, HexConstant, BinaryConstant, ListConstant, +) + + +def generic_constant_cmp(const1, const2): + """ + Generic comparator for most _Constant instances. They must have a "value" + attribute whose value supports the builtin comparison operators. + + :param const1: The first _Constant instance + :param const2: The second _Constant instance + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + return generic_cmp(const1.value, const2.value) + + +def bool_cmp(value1, value2): + """ + Compare two boolean constants. + + :param value1: The first BooleanConstant instance + :param value2: The second BooleanConstant instance + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + + # unwrap from _Constant instances + value1 = value1.value + value2 = value2.value + + if (value1 and value2) or (not value1 and not value2): + result = 0 + + # Let's say... True < False? + elif value1: + result = -1 + + else: + result = 1 + + return result + + +def hex_cmp(value1, value2): + """ + Compare two STIX "hex" values. This decodes to bytes and compares that. + It does *not* do a string compare on the hex representations. + + :param value1: The first HexConstant + :param value2: The second HexConstant + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + bytes1 = bytes.fromhex(value1.value) + bytes2 = bytes.fromhex(value2.value) + + return generic_cmp(bytes1, bytes2) + + +def bin_cmp(value1, value2): + """ + Compare two STIX "binary" values. This decodes to bytes and compares that. + It does *not* do a string compare on the base64 representations. + + :param value1: The first BinaryConstant + :param value2: The second BinaryConstant + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + bytes1 = base64.standard_b64decode(value1.value) + bytes2 = base64.standard_b64decode(value2.value) + + return generic_cmp(bytes1, bytes2) + + +def list_cmp(value1, value2): + """ + Compare lists order-insensitively. + + :param value1: The first ListConstant + :param value2: The second ListConstant + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + + # Achieve order-independence by sorting the lists first. + sorted_value1 = sorted( + value1.value, key=functools.cmp_to_key(constant_cmp), + ) + + sorted_value2 = sorted( + value2.value, key=functools.cmp_to_key(constant_cmp), + ) + + result = iter_lex_cmp(sorted_value1, sorted_value2, constant_cmp) + + return result + + +_CONSTANT_COMPARATORS = { + # We have special handling for ints/floats, so no entries for those AST + # classes here. See constant_cmp(). + StringConstant: generic_constant_cmp, + BooleanConstant: bool_cmp, + TimestampConstant: generic_constant_cmp, + HexConstant: hex_cmp, + BinaryConstant: bin_cmp, + ListConstant: list_cmp, +} + + +def object_path_component_cmp(comp1, comp2): + """ + Compare a string/int to another string/int; this induces an ordering over + all strings and ints. It is used to perform a lexicographical sort on + object paths. + + Ints and strings compare as usual to each other; ints compare less than + strings. + + :param comp1: An object path component (string or int) + :param comp2: An object path component (string or int) + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + + # both ints or both strings: use builtin comparison operators + if (isinstance(comp1, int) and isinstance(comp2, int)) \ + or (isinstance(comp1, str) and isinstance(comp2, str)): + result = generic_cmp(comp1, comp2) + + # one is int, one is string. Let's say ints come before strings. + elif isinstance(comp1, int): + result = -1 + + else: + result = 1 + + return result + + +def object_path_to_raw_values(path): + """ + Converts the given ObjectPath instance to a list of strings and ints. + All property names become strings, regardless of whether they're *_ref + properties; "*" index steps become that string; and numeric index steps + become integers. + + :param path: An ObjectPath instance + :return: A generator iterator over the values + """ + + for comp in path.property_path: + if isinstance(comp, ListObjectPathComponent): + yield comp.property_name + + if comp.index == "*" or isinstance(comp.index, int): + yield comp.index + else: + # in case the index is a stringified int; convert to an actual + # int + yield int(comp.index) + + else: + yield comp.property_name + + +def object_path_cmp(path1, path2): + """ + Compare two object paths. + + :param path1: The first ObjectPath instance + :param path2: The second ObjectPath instance + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + if path1.object_type_name < path2.object_type_name: + result = -1 + + elif path1.object_type_name > path2.object_type_name: + result = 1 + + else: + # I always thought of key and index path steps as separate. The AST + # lumps indices in with the previous key as a single path component. + # The following splits the path components into individual comparable + # values again. Maybe I should not do this... + path_vals1 = object_path_to_raw_values(path1) + path_vals2 = object_path_to_raw_values(path2) + result = iter_lex_cmp( + path_vals1, path_vals2, object_path_component_cmp, + ) + + return result + + +def comparison_operator_cmp(op1, op2): + """ + Compare two comparison operators. + + :param op1: The first comparison operator (a string) + :param op2: The second comparison operator (a string) + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + op1_idx = _COMPARISON_OP_ORDER.index(op1) + op2_idx = _COMPARISON_OP_ORDER.index(op2) + + result = generic_cmp(op1_idx, op2_idx) + + return result + + +def constant_cmp(value1, value2): + """ + Compare two constants. + + :param value1: The first _Constant instance + :param value2: The second _Constant instance + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + + # Special handling for ints/floats: treat them generically as numbers, + # ordered before all other types. + if isinstance(value1, (IntegerConstant, FloatConstant)) \ + and isinstance(value2, (IntegerConstant, FloatConstant)): + result = generic_constant_cmp(value1, value2) + + elif isinstance(value1, (IntegerConstant, FloatConstant)): + result = -1 + + elif isinstance(value2, (IntegerConstant, FloatConstant)): + result = 1 + + else: + + type1 = type(value1) + type2 = type(value2) + + type1_idx = _CONSTANT_TYPE_ORDER.index(type1) + type2_idx = _CONSTANT_TYPE_ORDER.index(type2) + + result = generic_cmp(type1_idx, type2_idx) + if result == 0: + # Types are the same; must compare values + cmp_func = _CONSTANT_COMPARATORS.get(type1) + if not cmp_func: + raise TypeError("Don't know how to compare " + type1.__name__) + + result = cmp_func(value1, value2) + + return result + + +def simple_comparison_expression_cmp(expr1, expr2): + """ + Compare "simple" comparison expressions: those which aren't AND/OR + combinations, just comparisons. + + :param expr1: first _ComparisonExpression instance + :param expr2: second _ComparisonExpression instance + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + + result = object_path_cmp(expr1.lhs, expr2.lhs) + + if result == 0: + result = comparison_operator_cmp(expr1.operator, expr2.operator) + + if result == 0: + # _ComparisonExpression's have a "negated" attribute. Umm... + # non-negated < negated? + if not expr1.negated and expr2.negated: + result = -1 + elif expr1.negated and not expr2.negated: + result = 1 + + if result == 0: + result = constant_cmp(expr1.rhs, expr2.rhs) + + return result + + +def comparison_expression_cmp(expr1, expr2): + """ + Compare two comparison expressions. This is sensitive to the order of the + expressions' sub-components. To achieve an order-insensitive comparison, + the ASTs must be canonically ordered first. + + :param expr1: The first comparison expression + :param expr2: The second comparison expression + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + if isinstance(expr1, _ComparisonExpression) \ + and isinstance(expr2, _ComparisonExpression): + result = simple_comparison_expression_cmp(expr1, expr2) + + # One is simple, one is compound. Let's say... simple ones come first? + elif isinstance(expr1, _ComparisonExpression): + result = -1 + + elif isinstance(expr2, _ComparisonExpression): + result = 1 + + # Both are compound: AND's before OR's? + elif isinstance(expr1, AndBooleanExpression) \ + and isinstance(expr2, OrBooleanExpression): + result = -1 + + elif isinstance(expr1, OrBooleanExpression) \ + and isinstance(expr2, AndBooleanExpression): + result = 1 + + else: + # Both compound, same boolean operator: sort according to contents. + # This will order according to recursive invocations of this comparator, + # on sub-expressions. + result = iter_lex_cmp( + expr1.operands, expr2.operands, comparison_expression_cmp, + ) + + return result diff --git a/stix2/equivalence/patterns/compare/observation.py b/stix2/equivalence/patterns/compare/observation.py new file mode 100644 index 0000000..227b8ae --- /dev/null +++ b/stix2/equivalence/patterns/compare/observation.py @@ -0,0 +1,123 @@ +""" +Comparison utilities for STIX pattern observation expressions. +""" +from stix2.equivalence.patterns.compare import generic_cmp, iter_lex_cmp +from stix2.equivalence.patterns.compare.comparison import ( + comparison_expression_cmp, generic_constant_cmp, +) +from stix2.patterns import ( + AndObservationExpression, FollowedByObservationExpression, + ObservationExpression, OrObservationExpression, + QualifiedObservationExpression, RepeatQualifier, StartStopQualifier, + WithinQualifier, _CompoundObservationExpression, +) + +_OBSERVATION_EXPRESSION_TYPE_ORDER = ( + ObservationExpression, AndObservationExpression, OrObservationExpression, + FollowedByObservationExpression, QualifiedObservationExpression, +) + + +_QUALIFIER_TYPE_ORDER = ( + RepeatQualifier, WithinQualifier, StartStopQualifier, +) + + +def repeats_cmp(qual1, qual2): + """ + Compare REPEATS qualifiers. This orders by repeat count. + """ + return generic_constant_cmp(qual1.times_to_repeat, qual2.times_to_repeat) + + +def within_cmp(qual1, qual2): + """ + Compare WITHIN qualifiers. This orders by number of seconds. + """ + return generic_constant_cmp( + qual1.number_of_seconds, qual2.number_of_seconds, + ) + + +def startstop_cmp(qual1, qual2): + """ + Compare START/STOP qualifiers. This lexicographically orders by start time, + then stop time. + """ + return iter_lex_cmp( + (qual1.start_time, qual1.stop_time), + (qual2.start_time, qual2.stop_time), + generic_constant_cmp, + ) + + +_QUALIFIER_COMPARATORS = { + RepeatQualifier: repeats_cmp, + WithinQualifier: within_cmp, + StartStopQualifier: startstop_cmp, +} + + +def observation_expression_cmp(expr1, expr2): + """ + Compare two observation expression ASTs. This is sensitive to the order of + the expressions' sub-components. To achieve an order-insensitive + comparison, the ASTs must be canonically ordered first. + + :param expr1: The first observation expression + :param expr2: The second observation expression + :return: <0, 0, or >0 depending on whether the first arg is less, equal or + greater than the second + """ + type1 = type(expr1) + type2 = type(expr2) + + type1_idx = _OBSERVATION_EXPRESSION_TYPE_ORDER.index(type1) + type2_idx = _OBSERVATION_EXPRESSION_TYPE_ORDER.index(type2) + + if type1_idx != type2_idx: + result = generic_cmp(type1_idx, type2_idx) + + # else, both exprs are of same type. + + # If they're simple, use contained comparison expression order + elif type1 is ObservationExpression: + result = comparison_expression_cmp( + expr1.operand, expr2.operand, + ) + + elif isinstance(expr1, _CompoundObservationExpression): + # Both compound, and of same type (and/or/followedby): sort according + # to contents. + result = iter_lex_cmp( + expr1.operands, expr2.operands, observation_expression_cmp, + ) + + else: # QualifiedObservationExpression + # Both qualified. Check qualifiers first; if they are the same, + # use order of the qualified expressions. + qual1_type = type(expr1.qualifier) + qual2_type = type(expr2.qualifier) + + qual1_type_idx = _QUALIFIER_TYPE_ORDER.index(qual1_type) + qual2_type_idx = _QUALIFIER_TYPE_ORDER.index(qual2_type) + + result = generic_cmp(qual1_type_idx, qual2_type_idx) + + if result == 0: + # Same qualifier type; compare qualifier details + qual_cmp = _QUALIFIER_COMPARATORS.get(qual1_type) + if qual_cmp: + result = qual_cmp(expr1.qualifier, expr2.qualifier) + else: + raise TypeError( + "Can't compare qualifier type: " + qual1_type.__name__, + ) + + if result == 0: + # Same qualifier type and details; use qualified expression order + result = observation_expression_cmp( + expr1.observation_expression, expr2.observation_expression, + ) + + return result diff --git a/stix2/equivalence/patterns/transform/__init__.py b/stix2/equivalence/patterns/transform/__init__.py new file mode 100644 index 0000000..84a993c --- /dev/null +++ b/stix2/equivalence/patterns/transform/__init__.py @@ -0,0 +1,57 @@ +""" +Generic AST transformation classes. +""" + + +class Transformer: + """ + Base class for AST transformers. + """ + def transform(self, ast): + """ + Transform the given AST and return the resulting AST. + + :param ast: The AST to transform + :return: A 2-tuple: the transformed AST and a boolean indicating whether + the transformation actually changed anything. The change detection + is useful in situations where a transformation needs to be repeated + until the AST stops changing. + """ + raise NotImplementedError("transform") + + +class ChainTransformer(Transformer): + """ + A composite transformer which consists of a sequence of sub-transformers. + Applying this transformer applies all sub-transformers in sequence, as + a group. + """ + def __init__(self, *transformers): + self.__transformers = transformers + + def transform(self, ast): + changed = False + for transformer in self.__transformers: + ast, this_changed = transformer.transform(ast) + if this_changed: + changed = True + + return ast, changed + + +class SettleTransformer(Transformer): + """ + A transformer that repeatedly performs a transformation until that + transformation no longer changes the AST. I.e. the AST has "settled". + """ + def __init__(self, transform): + self.__transformer = transform + + def transform(self, ast): + changed = False + ast, this_changed = self.__transformer.transform(ast) + while this_changed: + changed = True + ast, this_changed = self.__transformer.transform(ast) + + return ast, changed diff --git a/stix2/equivalence/patterns/transform/comparison.py b/stix2/equivalence/patterns/transform/comparison.py new file mode 100644 index 0000000..528cc9b --- /dev/null +++ b/stix2/equivalence/patterns/transform/comparison.py @@ -0,0 +1,378 @@ +""" +Transformation utilities for STIX pattern comparison expressions. +""" +import functools +import itertools + +from stix2.equivalence.patterns.compare import iter_in, iter_lex_cmp +from stix2.equivalence.patterns.compare.comparison import ( + comparison_expression_cmp, +) +from stix2.equivalence.patterns.transform import Transformer +from stix2.equivalence.patterns.transform.specials import ( + ipv4_addr, ipv6_addr, windows_reg_key, +) +from stix2.patterns import ( + AndBooleanExpression, OrBooleanExpression, ParentheticalExpression, + _BooleanExpression, _ComparisonExpression, +) + + +def _dupe_ast(ast): + """ + Create a duplicate of the given AST. + + Note: the comparison expression "leaves", i.e. simple + comparisons are currently not duplicated. I don't think it's necessary as + of this writing; they are never changed. But revisit this if/when + necessary. + + :param ast: The AST to duplicate + :return: The duplicate AST + """ + if isinstance(ast, AndBooleanExpression): + result = AndBooleanExpression([ + _dupe_ast(operand) for operand in ast.operands + ]) + + elif isinstance(ast, OrBooleanExpression): + result = OrBooleanExpression([ + _dupe_ast(operand) for operand in ast.operands + ]) + + elif isinstance(ast, _ComparisonExpression): + # Change this to create a dupe, if we ever need to change simple + # comparison expressions as part of canonicalization. + result = ast + + else: + raise TypeError("Can't duplicate " + type(ast).__name__) + + return result + + +class ComparisonExpressionTransformer(Transformer): + """ + Transformer base class with special support for transforming comparison + expressions. The transform method implemented here performs a bottom-up + in-place transformation, with support for some comparison + expression-specific callbacks. + + Specifically, subclasses can implement methods: + "transform_or" for OR nodes + "transform_and" for AND nodes + "transform_comparison" for plain comparison nodes ( ) + "transform_default" for both types of nodes + + "transform_default" is a fallback, if a type-specific callback is not + found. The default implementation does nothing to the AST. The + type-specific callbacks are preferred over the default, if both exist. + + In all cases, the callbacks are called with an AST for a subtree rooted at + the appropriate node type, where the subtree's children have already been + transformed. They must return the same thing as the base transform() + method: a 2-tuple with the transformed AST and a boolean for change + detection. See doc for the superclass' method. + + This process currently silently drops parenthetical nodes. + """ + + def transform(self, ast): + if isinstance(ast, _BooleanExpression): + changed = False + for i, operand in enumerate(ast.operands): + operand_result, this_changed = self.transform(operand) + if this_changed: + changed = True + + ast.operands[i] = operand_result + + result, this_changed = self.__dispatch_transform(ast) + if this_changed: + changed = True + + elif isinstance(ast, _ComparisonExpression): + result, changed = self.__dispatch_transform(ast) + + elif isinstance(ast, ParentheticalExpression): + # Drop these + result, changed = self.transform(ast.expression) + + else: + raise TypeError("Not a comparison expression: " + str(ast)) + + return result, changed + + def __dispatch_transform(self, ast): + """ + Invoke a transformer callback method based on the given ast root node + type. + + :param ast: The AST + :return: The callback's result + """ + + if isinstance(ast, AndBooleanExpression): + meth = getattr(self, "transform_and", self.transform_default) + + elif isinstance(ast, OrBooleanExpression): + meth = getattr(self, "transform_or", self.transform_default) + + elif isinstance(ast, _ComparisonExpression): + meth = getattr( + self, "transform_comparison", self.transform_default, + ) + + else: + meth = self.transform_default + + return meth(ast) + + def transform_default(self, ast): + """ + Override to handle transforming AST nodes which don't have a more + specific method implemented. + """ + return ast, False + + +class OrderDedupeTransformer( + ComparisonExpressionTransformer +): + """ + Canonically order the children of all nodes in the AST. Because the + deduping algorithm is based on sorted data, this transformation also does + deduping. + + E.g.: + A and A => A + A or A => A + """ + + def __transform(self, ast): + """ + Sort/dedupe children. AND and OR can be treated identically. + + :param ast: The comparison expression AST + :return: The same AST node, but with sorted children + """ + sorted_children = sorted( + ast.operands, key=functools.cmp_to_key(comparison_expression_cmp), + ) + + deduped_children = [ + # Apparently when using a key function, groupby()'s "keys" are the + # key wrappers, not actual sequence values. Obviously we don't + # need key wrappers in our ASTs! + k.obj for k, _ in itertools.groupby( + sorted_children, key=functools.cmp_to_key( + comparison_expression_cmp, + ), + ) + ] + + changed = iter_lex_cmp( + ast.operands, deduped_children, comparison_expression_cmp, + ) != 0 + + ast.operands = deduped_children + + return ast, changed + + def transform_or(self, ast): + return self.__transform(ast) + + def transform_and(self, ast): + return self.__transform(ast) + + +class FlattenTransformer(ComparisonExpressionTransformer): + """ + Flatten all nodes of the AST. E.g.: + + A and (B and C) => A and B and C + A or (B or C) => A or B or C + (A) => A + """ + + def __transform(self, ast): + """ + Flatten children. AND and OR can be treated mostly identically. The + little difference is that we can absorb AND children if we're an AND + ourselves; and OR for OR. + + :param ast: The comparison expression AST + :return: The same AST node, but with flattened children + """ + + changed = False + if len(ast.operands) == 1: + # Replace an AND/OR with one child, with the child itself. + ast = ast.operands[0] + changed = True + + else: + flat_operands = [] + for operand in ast.operands: + if isinstance(operand, _BooleanExpression) \ + and ast.operator == operand.operator: + flat_operands.extend(operand.operands) + changed = True + + else: + flat_operands.append(operand) + + ast.operands = flat_operands + + return ast, changed + + def transform_or(self, ast): + return self.__transform(ast) + + def transform_and(self, ast): + return self.__transform(ast) + + +class AbsorptionTransformer( + ComparisonExpressionTransformer +): + """ + Applies boolean "absorption" rules for AST simplification. E.g.: + + A and (A or B) = A + A or (A and B) = A + """ + + def __transform(self, ast): + + changed = False + secondary_op = "AND" if ast.operator == "OR" else "OR" + + to_delete = set() + + # Check i (child1) against j to see if we can delete j. + for i, child1 in enumerate(ast.operands): + if i in to_delete: + continue + + for j, child2 in enumerate(ast.operands): + if i == j or j in to_delete: + continue + + # We're checking if child1 is contained in child2, so + # child2 has to be a compound object, not just a simple + # comparison expression. We also require the right operator + # for child2: "AND" if ast is "OR" and vice versa. + if not isinstance(child2, _BooleanExpression) \ + or child2.operator != secondary_op: + continue + + # The simple check: is child1 contained in child2? + if iter_in( + child1, child2.operands, comparison_expression_cmp, + ): + to_delete.add(j) + + # A more complicated check: does child1 occur in child2 + # in a "flattened" form? + elif child1.operator == child2.operator: + if all( + iter_in( + child1_operand, child2.operands, + comparison_expression_cmp, + ) + for child1_operand in child1.operands + ): + to_delete.add(j) + + if to_delete: + changed = True + + for i in reversed(sorted(to_delete)): + del ast.operands[i] + + return ast, changed + + def transform_or(self, ast): + return self.__transform(ast) + + def transform_and(self, ast): + return self.__transform(ast) + + +class DNFTransformer(ComparisonExpressionTransformer): + """ + Convert a comparison expression AST to DNF. E.g.: + + A and (B or C) => (A and B) or (A and C) + """ + def transform_and(self, ast): + or_children = [] + other_children = [] + changed = False + + # Sort AND children into two piles: the ORs and everything else + for child in ast.operands: + if isinstance(child, _BooleanExpression) and child.operator == "OR": + # Need a list of operand lists, so we can compute the + # product below. + or_children.append(child.operands) + else: + other_children.append(child) + + if or_children: + distributed_children = [ + AndBooleanExpression([ + # Make dupes: distribution implies adding repetition, and + # we should ensure each repetition is independent of the + # others. + _dupe_ast(sub_ast) for sub_ast in itertools.chain( + other_children, prod_seq, + ) + ]) + for prod_seq in itertools.product(*or_children) + ] + + # Need to recursively continue to distribute AND over OR in + # any of our new sub-expressions which need it. This causes + # more downward recursion in the midst of this bottom-up transform. + # It's not good for performance. I wonder if a top-down + # transformation algorithm would make more sense in this phase? + # But then we'd be using two different algorithms for the same + # thing... Maybe this transform should be completely top-down + # (no bottom-up component at all)? + distributed_children = [ + self.transform(child)[0] for child in distributed_children + ] + + result = OrBooleanExpression(distributed_children) + changed = True + + else: + # No AND-over-OR; nothing to do + result = ast + + return result, changed + + +class SpecialValueCanonicalization(ComparisonExpressionTransformer): + """ + Try to find particular leaf-node comparison expressions whose rhs (i.e. the + constant) can be canonicalized. This is an idiosyncratic transformation + based on some ideas people had for context-sensitive semantic equivalence + in constant values. + """ + def transform_comparison(self, ast): + if ast.lhs.object_type_name == "windows-registry-key": + windows_reg_key(ast) + + elif ast.lhs.object_type_name == "ipv4-addr": + ipv4_addr(ast) + + elif ast.lhs.object_type_name == "ipv6-addr": + ipv6_addr(ast) + + # Hard-code False here since this particular canonicalization is never + # worth doing more than once. I think it's okay to pretend nothing has + # changed. + return ast, False diff --git a/stix2/equivalence/patterns/transform/observation.py b/stix2/equivalence/patterns/transform/observation.py new file mode 100644 index 0000000..d4ee175 --- /dev/null +++ b/stix2/equivalence/patterns/transform/observation.py @@ -0,0 +1,495 @@ +""" +Transformation utilities for STIX pattern observation expressions. +""" +import functools +import itertools + +from stix2.equivalence.patterns.compare import iter_in, iter_lex_cmp +from stix2.equivalence.patterns.compare.observation import ( + observation_expression_cmp, +) +from stix2.equivalence.patterns.transform import ( + ChainTransformer, SettleTransformer, Transformer, +) +from stix2.equivalence.patterns.transform.comparison import ( + SpecialValueCanonicalization, +) +from stix2.equivalence.patterns.transform.comparison import \ + AbsorptionTransformer as CAbsorptionTransformer +from stix2.equivalence.patterns.transform.comparison import \ + DNFTransformer as CDNFTransformer +from stix2.equivalence.patterns.transform.comparison import \ + FlattenTransformer as CFlattenTransformer +from stix2.equivalence.patterns.transform.comparison import \ + OrderDedupeTransformer as COrderDedupeTransformer +from stix2.patterns import ( + AndObservationExpression, FollowedByObservationExpression, + ObservationExpression, OrObservationExpression, ParentheticalExpression, + QualifiedObservationExpression, _CompoundObservationExpression, +) + + +def _dupe_ast(ast): + """ + Create a duplicate of the given AST. The AST root must be an observation + expression of some kind (AND/OR/qualified, etc). + + Note: the observation expression "leaves", i.e. simple square-bracket + observation expressions are currently not duplicated. I don't think it's + necessary as of this writing. But revisit this if/when necessary. + + :param ast: The AST to duplicate + :return: The duplicate AST + """ + if isinstance(ast, AndObservationExpression): + result = AndObservationExpression([ + _dupe_ast(child) for child in ast.operands + ]) + + elif isinstance(ast, OrObservationExpression): + result = OrObservationExpression([ + _dupe_ast(child) for child in ast.operands + ]) + + elif isinstance(ast, FollowedByObservationExpression): + result = FollowedByObservationExpression([ + _dupe_ast(child) for child in ast.operands + ]) + + elif isinstance(ast, QualifiedObservationExpression): + # Don't need to dupe the qualifier object at this point + result = QualifiedObservationExpression( + _dupe_ast(ast.observation_expression), ast.qualifier, + ) + + elif isinstance(ast, ObservationExpression): + result = ast + + else: + raise TypeError("Can't duplicate " + type(ast).__name__) + + return result + + +class ObservationExpressionTransformer(Transformer): + """ + Transformer base class with special support for transforming observation + expressions. The transform method implemented here performs a bottom-up + in-place transformation, with support for some observation + expression-specific callbacks. It recurses down as far as the "leaf node" + observation expressions; it does not go inside of them, to the individual + components of a comparison expression. + + Specifically, subclasses can implement methods: + "transform_or" for OR nodes + "transform_and" for AND nodes + "transform_followedby" for FOLLOWEDBY nodes + "transform_qualified" for qualified nodes (all qualifier types) + "transform_observation" for "leaf" observation expression nodes + "transform_default" for all types of nodes + + "transform_default" is a fallback, if a type-specific callback is not + found. The default implementation does nothing to the AST. The + type-specific callbacks are preferred over the default, if both exist. + + In all cases, the callbacks are called with an AST for a subtree rooted at + the appropriate node type, where the AST's children have already been + transformed. They must return the same thing as the base transform() + method: a 2-tuple with the transformed AST and a boolean for change + detection. See doc for the superclass' method. + + This process currently silently drops parenthetical nodes. + """ + + # Determines how AST node types map to callback method names + _DISPATCH_NAME_MAP = { + ObservationExpression: "observation", + AndObservationExpression: "and", + OrObservationExpression: "or", + FollowedByObservationExpression: "followedby", + QualifiedObservationExpression: "qualified", + } + + def transform(self, ast): + + changed = False + if isinstance(ast, ObservationExpression): + # A "leaf node" for observation expressions. We don't recurse into + # these. + result, this_changed = self.__dispatch_transform(ast) + if this_changed: + changed = True + + elif isinstance(ast, _CompoundObservationExpression): + for i, operand in enumerate(ast.operands): + result, this_changed = self.transform(operand) + if this_changed: + ast.operands[i] = result + changed = True + + result, this_changed = self.__dispatch_transform(ast) + if this_changed: + changed = True + + elif isinstance(ast, QualifiedObservationExpression): + # I don't think we need to process/transform the qualifier by + # itself, do we? + result, this_changed = self.transform(ast.observation_expression) + if this_changed: + ast.observation_expression = result + changed = True + + result, this_changed = self.__dispatch_transform(ast) + if this_changed: + changed = True + + elif isinstance(ast, ParentheticalExpression): + result, _ = self.transform(ast.expression) + # Dropping a node is a change, right? + changed = True + + else: + raise TypeError("Not an observation expression: {}: {}".format( + type(ast).__name__, str(ast), + )) + + return result, changed + + def __dispatch_transform(self, ast): + """ + Invoke a transformer callback method based on the given ast root node + type. + + :param ast: The AST + :return: The callback's result + """ + + dispatch_name = self._DISPATCH_NAME_MAP.get(type(ast)) + if dispatch_name: + meth_name = "transform_" + dispatch_name + meth = getattr(self, meth_name, self.transform_default) + else: + meth = self.transform_default + + return meth(ast) + + def transform_default(self, ast): + return ast, False + + +class FlattenTransformer(ObservationExpressionTransformer): + """ + Flatten an observation expression AST. E.g.: + + A and (B and C) => A and B and C + A or (B or C) => A or B or C + A followedby (B followedby C) => A followedby B followedby C + (A) => A + """ + + def __transform(self, ast): + + changed = False + + if len(ast.operands) == 1: + # Replace an AND/OR/FOLLOWEDBY with one child, with the child + # itself. + result = ast.operands[0] + changed = True + + else: + flat_children = [] + for operand in ast.operands: + if isinstance(operand, _CompoundObservationExpression) \ + and ast.operator == operand.operator: + flat_children.extend(operand.operands) + changed = True + else: + flat_children.append(operand) + + ast.operands = flat_children + result = ast + + return result, changed + + def transform_and(self, ast): + return self.__transform(ast) + + def transform_or(self, ast): + return self.__transform(ast) + + def transform_followedby(self, ast): + return self.__transform(ast) + + +class OrderDedupeTransformer( + ObservationExpressionTransformer +): + """ + Canonically order AND/OR expressions, and dedupe ORs. E.g.: + + A or A => A + B or A => A or B + B and A => A and B + """ + + def __transform(self, ast): + sorted_children = sorted( + ast.operands, key=functools.cmp_to_key(observation_expression_cmp), + ) + + # Deduping only applies to ORs + if ast.operator == "OR": + deduped_children = [ + key.obj for key, _ in itertools.groupby( + sorted_children, key=functools.cmp_to_key( + observation_expression_cmp, + ), + ) + ] + else: + deduped_children = sorted_children + + changed = iter_lex_cmp( + ast.operands, deduped_children, observation_expression_cmp, + ) != 0 + + ast.operands = deduped_children + + return ast, changed + + def transform_and(self, ast): + return self.__transform(ast) + + def transform_or(self, ast): + return self.__transform(ast) + + +class AbsorptionTransformer( + ObservationExpressionTransformer +): + """ + Applies boolean "absorption" rules for observation expressions, for AST + simplification: + + A or (A and B) = A + A or (A followedby B) = A + + Other variants do not hold for observation expressions. + """ + + def __is_contained_and(self, exprs_containee, exprs_container): + """ + Determine whether the "containee" expressions are contained in the + "container" expressions, with AND semantics (order-independent but need + distinct bindings). For example (with containee on left and container + on right): + + (A and A and B) or (A and B and C) + + In the above, all of the lhs vars have a counterpart in the rhs, but + there are two A's on the left and only one on the right. Therefore, + the right does not "contain" the left. You would need two A's on the + right. + + :param exprs_containee: The expressions we want to check for containment + :param exprs_container: The expressions acting as the "container" + :return: True if the containee is contained in the container; False if + not + """ + + # make our own list we are free to manipulate without affecting the + # function args. + container = list(exprs_container) + + result = True + for ee in exprs_containee: + for i, er in enumerate(container): + if observation_expression_cmp(ee, er) == 0: + # Found a match in the container; delete it so we never try + # to match a container expr to two different containee + # expressions. + del container[i] + break + else: + result = False + break + + return result + + def __is_contained_followedby(self, exprs_containee, exprs_container): + """ + Determine whether the "containee" expressions are contained in the + "container" expressions, with FOLLOWEDBY semantics (order-sensitive and + need distinct bindings). For example (with containee on left and + container on right): + + (A followedby B) or (B followedby A) + + In the above, all of the lhs vars have a counterpart in the rhs, but + the vars on the right are not in the same order. Therefore, the right + does not "contain" the left. The container vars don't have to be + contiguous though. E.g. in: + + (A followedby B) or (D followedby A followedby C followedby B) + + in the container (rhs), B follows A, so it "contains" the lhs even + though there is other stuff mixed in. + + :param exprs_containee: The expressions we want to check for containment + :param exprs_container: The expressions acting as the "container" + :return: True if the containee is contained in the container; False if + not + """ + + ee_iter = iter(exprs_containee) + er_iter = iter(exprs_container) + + result = True + while True: + ee = next(ee_iter, None) + if not ee: + break + + while True: + er = next(er_iter, None) + if er: + if observation_expression_cmp(ee, er) == 0: + break + else: + break + + if not er: + result = False + break + + return result + + def transform_or(self, ast): + changed = False + to_delete = set() + for i, child1 in enumerate(ast.operands): + if i in to_delete: + continue + + # The simplification doesn't work across qualifiers + if isinstance(child1, QualifiedObservationExpression): + continue + + for j, child2 in enumerate(ast.operands): + if i == j or j in to_delete: + continue + + if isinstance( + child2, ( + AndObservationExpression, + FollowedByObservationExpression, + ), + ): + # The simple check: is child1 contained in child2? + if iter_in( + child1, child2.operands, observation_expression_cmp, + ): + to_delete.add(j) + + # A more complicated check: does child1 occur in child2 + # in a "flattened" form? + elif type(child1) is type(child2): + if isinstance(child1, AndObservationExpression): + can_simplify = self.__is_contained_and( + child1.operands, child2.operands, + ) + else: # child1 and 2 are followedby nodes + can_simplify = self.__is_contained_followedby( + child1.operands, child2.operands, + ) + + if can_simplify: + to_delete.add(j) + + if to_delete: + changed = True + + for i in reversed(sorted(to_delete)): + del ast.operands[i] + + return ast, changed + + +class DNFTransformer(ObservationExpressionTransformer): + """ + Transform an observation expression to DNF. This will distribute AND and + FOLLOWEDBY over OR: + + A and (B or C) => (A and B) or (A and C) + A followedby (B or C) => (A followedby B) or (A followedby C) + """ + + def __transform(self, ast): + + root_type = type(ast) # will be AST class for AND or FOLLOWEDBY + changed = False + or_children = [] + other_children = [] + for child in ast.operands: + if isinstance(child, OrObservationExpression): + or_children.append(child.operands) + else: + other_children.append(child) + + if or_children: + distributed_children = [ + root_type([ + _dupe_ast(sub_ast) for sub_ast in itertools.chain( + other_children, prod_seq, + ) + ]) + for prod_seq in itertools.product(*or_children) + ] + + # Need to recursively continue to distribute AND/FOLLOWEDBY over OR + # in any of our new sub-expressions which need it. + distributed_children = [ + self.transform(child)[0] for child in distributed_children + ] + + result = OrObservationExpression(distributed_children) + changed = True + + else: + result = ast + + return result, changed + + def transform_and(self, ast): + return self.__transform(ast) + + def transform_followedby(self, ast): + return self.__transform(ast) + + +class CanonicalizeComparisonExpressionsTransformer( + ObservationExpressionTransformer +): + """ + Canonicalize all comparison expressions. + """ + def __init__(self): + comp_flatten = CFlattenTransformer() + comp_order = COrderDedupeTransformer() + comp_absorb = CAbsorptionTransformer() + simplify = ChainTransformer(comp_flatten, comp_order, comp_absorb) + settle_simplify = SettleTransformer(simplify) + + comp_special = SpecialValueCanonicalization() + comp_dnf = CDNFTransformer() + self.__comp_canonicalize = ChainTransformer( + comp_special, settle_simplify, comp_dnf, settle_simplify, + ) + + def transform_observation(self, ast): + comp_expr = ast.operand + canon_comp_expr, changed = self.__comp_canonicalize.transform(comp_expr) + ast.operand = canon_comp_expr + + return ast, changed diff --git a/stix2/equivalence/patterns/transform/specials.py b/stix2/equivalence/patterns/transform/specials.py new file mode 100644 index 0000000..b95e6bf --- /dev/null +++ b/stix2/equivalence/patterns/transform/specials.py @@ -0,0 +1,227 @@ +""" +Some simple comparison expression canonicalization functions. +""" +import socket + +from stix2.equivalence.patterns.compare.comparison import ( + object_path_to_raw_values, +) + +# Values we can use as wildcards in path patterns +_ANY_IDX = object() +_ANY_KEY = object() +_ANY = object() + + +def _path_is(object_path, path_pattern): + """ + Compare an object path against a pattern. This enables simple path + recognition based on a pattern, which is slightly more flexible than exact + equality: it supports some simple wildcards. + + The path pattern must be an iterable of values: strings for key path steps, + ints or "*" for index path steps, or wildcards. Exact matches are required + for non-wildcards in the pattern. For the wildcards, _ANY_IDX matches any + index path step; _ANY_KEY matches any key path step, and _ANY matches any + path step. + + :param object_path: An ObjectPath instance + :param path_pattern: An iterable giving the pattern path steps + :return: True if the path matches the pattern; False if not + """ + path_values = object_path_to_raw_values(object_path) + + path_iter = iter(path_values) + patt_iter = iter(path_pattern) + + result = True + while True: + path_val = next(path_iter, None) + patt_val = next(patt_iter, None) + + if path_val is None and patt_val is None: + # equal length sequences; no differences found + break + + elif path_val is None or patt_val is None: + # unequal length sequences + result = False + break + + elif patt_val is _ANY_IDX: + if not isinstance(path_val, int) and path_val != "*": + result = False + break + + elif patt_val is _ANY_KEY: + if not isinstance(path_val, str): + result = False + break + + elif patt_val is not _ANY and patt_val != path_val: + result = False + break + + return result + + +def _mask_bytes(ip_bytes, prefix_size): + """ + Retain the high-order 'prefix_size' bits from ip_bytes, and zero out the + remaining low-order bits. This side-effects ip_bytes. + + :param ip_bytes: A mutable byte sequence (e.g. a bytearray) + :param prefix_size: An integer prefix size + """ + addr_size_bytes = len(ip_bytes) + addr_size_bits = 8 * addr_size_bytes + + assert 0 <= prefix_size <= addr_size_bits + + num_fixed_bytes = prefix_size // 8 + num_zero_bytes = (addr_size_bits - prefix_size) // 8 + + if num_zero_bytes > 0: + ip_bytes[addr_size_bytes - num_zero_bytes:] = b"\x00" * num_zero_bytes + + if num_fixed_bytes + num_zero_bytes != addr_size_bytes: + # The address boundary doesn't fall on a byte boundary. + # So we have a byte for which we have to zero out some + # bits. + num_1_bits = prefix_size % 8 + mask = ((1 << num_1_bits) - 1) << (8 - num_1_bits) + ip_bytes[num_fixed_bytes] &= mask + + +def windows_reg_key(comp_expr): + """ + Lower-cases the rhs, depending on the windows-registry-key property + being compared. This enables case-insensitive comparisons between two + patterns, for those values. This side-effects the given AST. + + :param comp_expr: A _ComparisonExpression object whose type is + windows-registry-key + """ + if _path_is(comp_expr.lhs, ("key",)) \ + or _path_is(comp_expr.lhs, ("values", _ANY_IDX, "name")): + comp_expr.rhs.value = comp_expr.rhs.value.lower() + + +def ipv4_addr(comp_expr): + """ + Canonicalizes a CIDR IPv4 address by zeroing out low-order bits, according + to the prefix size. This affects the rhs when the "value" property of an + ipv4-addr is being compared. If the prefix size is 32, the size suffix is + simply dropped since it's redundant. If the value is not a valid CIDR + address, then no change is made. This also runs the address through the + platform's IPv4 address processing functions (inet_aton() and inet_ntoa()), + which can adjust the format. + + This side-effects the given AST. + + :param comp_expr: A _ComparisonExpression object whose type is ipv4-addr. + """ + if _path_is(comp_expr.lhs, ("value",)): + value = comp_expr.rhs.value + slash_idx = value.find("/") + is_cidr = slash_idx >= 0 + + if is_cidr: + ip_str = value[:slash_idx] + else: + ip_str = value + + try: + ip_bytes = socket.inet_aton(ip_str) + except OSError: + # illegal IPv4 address string + return + + if is_cidr: + try: + prefix_size = int(value[slash_idx+1:]) + except ValueError: + # illegal prefix size + return + + if prefix_size < 0 or prefix_size > 32: + # illegal prefix size + return + + if not is_cidr or prefix_size == 32: + # If a CIDR with prefix size 32, drop the prefix size since it's + # redundant. Run the address bytes through inet_ntoa() in case it + # would adjust the format (e.g. drop leading zeros: + # 1.2.3.004 => 1.2.3.4). + value = socket.inet_ntoa(ip_bytes) + + else: + # inet_aton() gives an immutable 'bytes' value; we need a value + # we can change. + ip_bytes = bytearray(ip_bytes) + _mask_bytes(ip_bytes, prefix_size) + + ip_str = socket.inet_ntoa(ip_bytes) + value = ip_str + "/" + str(prefix_size) + + comp_expr.rhs.value = value + + +def ipv6_addr(comp_expr): + """ + Canonicalizes a CIDR IPv6 address by zeroing out low-order bits, according + to the prefix size. This affects the rhs when the "value" property of an + ipv6-addr is being compared. If the prefix size is 128, the size suffix is + simply dropped since it's redundant. If the value is not a valid CIDR + address, then no change is made. This also runs the address through the + platform's IPv6 address processing functions (inet_pton() and inet_ntop()), + which can adjust the format. + + This side-effects the given AST. + + :param comp_expr: A _ComparisonExpression object whose type is ipv6-addr. + """ + if _path_is(comp_expr.lhs, ("value",)): + value = comp_expr.rhs.value + slash_idx = value.find("/") + is_cidr = slash_idx >= 0 + + if is_cidr: + ip_str = value[:slash_idx] + else: + ip_str = value + + try: + ip_bytes = socket.inet_pton(socket.AF_INET6, ip_str) + except OSError: + # illegal IPv6 address string + return + + if is_cidr: + try: + prefix_size = int(value[slash_idx+1:]) + except ValueError: + # illegal prefix size + return + + if prefix_size < 0 or prefix_size > 128: + # illegal prefix size + return + + if not is_cidr or prefix_size == 128: + # If a CIDR with prefix size 128, drop the prefix size since it's + # redundant. Run the IP address through inet_ntop() so it can + # reformat with the double-colons (and make any other adjustments) + # if necessary. + value = socket.inet_ntop(socket.AF_INET6, ip_bytes) + + else: + # inet_pton() gives an immutable 'bytes' value; we need a value + # we can change. + ip_bytes = bytearray(ip_bytes) + _mask_bytes(ip_bytes, prefix_size) + + ip_str = socket.inet_ntop(socket.AF_INET6, ip_bytes) + value = ip_str + "/" + str(prefix_size) + + comp_expr.rhs.value = value diff --git a/stix2/pattern_visitor.py b/stix2/pattern_visitor.py index a9d43c5..c4b2ec2 100644 --- a/stix2/pattern_visitor.py +++ b/stix2/pattern_visitor.py @@ -2,8 +2,8 @@ import importlib import inspect -from six import text_type +from six import text_type from stix2patterns.exceptions import ParseException from stix2patterns.grammars.STIXPatternParser import TerminalNode from stix2patterns.v20.grammars.STIXPatternParser import \ @@ -261,9 +261,11 @@ class STIXPatternVisitorForSTIX2(): property_path.append(self.instantiate("ListObjectPathComponent", current.property_name, next.getText())) i += 2 elif isinstance(next, IntegerConstant): - property_path.append(self.instantiate("ListObjectPathComponent", - current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current), - next.value)) + property_path.append(self.instantiate( + "ListObjectPathComponent", + current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current), + next.value, + )) i += 2 else: property_path.append(current) diff --git a/stix2/test/test_pattern_equivalence.py b/stix2/test/test_pattern_equivalence.py new file mode 100644 index 0000000..6fc2adf --- /dev/null +++ b/stix2/test/test_pattern_equivalence.py @@ -0,0 +1,634 @@ +import pytest + +from stix2.equivalence.patterns import ( + equivalent_patterns, find_equivalent_patterns, +) + +# # # # +# # Observation expression equivalence tests # # +# # # # + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] OR [a:b=1]", + "[a:b=1]", + ), + ( + "[a:b=1] OR [a:b=1] OR [a:b=1]", + "[a:b=1]", + ), + ], +) +def test_obs_dupe_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] AND [a:b=1]", + "[a:b=1]", + ), + ( + "[a:b=1] FOLLOWEDBY [a:b=1]", + "[a:b=1]", + ), + ], +) +def test_obs_dupe_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ("[a:b=1]", "([a:b=1])"), + ("(((([a:b=1]))))", "([a:b=1])"), + ( + "[a:b=1] AND ([a:b=2] AND [a:b=3])", + "[a:b=1] AND [a:b=2] AND [a:b=3]", + ), + ( + "([a:b=1] AND [a:b=2]) AND [a:b=3]", + "[a:b=1] AND ([a:b=2] AND [a:b=3])", + ), + ( + "[a:b=1] OR ([a:b=2] OR [a:b=3])", + "[a:b=1] OR [a:b=2] OR [a:b=3]", + ), + ( + "([a:b=1] OR [a:b=2]) OR [a:b=3]", + "[a:b=1] OR ([a:b=2] OR [a:b=3])", + ), + ( + "[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY [a:b=3])", + "[a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]", + ), + ( + "([a:b=1] FOLLOWEDBY [a:b=2]) FOLLOWEDBY [a:b=3]", + "[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY [a:b=3])", + ), + ( + "[a:b=1] AND ([a:b=2] AND ([a:b=3] AND [a:b=4])) AND ([a:b=5])", + "([a:b=1] AND ([a:b=2] AND [a:b=3]) AND ([a:b=4] AND [a:b=5]))", + ), + ], +) +def test_obs_flatten_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "([a:b=1] AND [a:b=2]) OR [a:b=3]", + "[a:b=1] AND ([a:b=2] OR [a:b=3])", + ), + ( + "([a:b=1] OR [a:b=2]) FOLLOWEDBY [a:b=3]", + "[a:b=1] OR ([a:b=2] FOLLOWEDBY [a:b=3])", + ), + ("[a:b=1]", "([a:b=1]) REPEATS 2 TIMES"), + ("(((([a:b=1]))))", "([a:b=1] REPEATS 2 TIMES)"), + ( + "[a:b=1] AND ([a:b=2] AND [a:b=3]) WITHIN 2 SECONDS", + "[a:b=1] WITHIN 2 SECONDS AND [a:b=2] AND [a:b=3]", + ), + ( + "[a:b=1] OR ([a:b=2] OR [a:b=3]) WITHIN 2 SECONDS", + "[a:b=1] WITHIN 2 SECONDS OR [a:b=2] OR [a:b=3]", + ), + ( + "[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY [a:b=3]) WITHIN 2 SECONDS", + "[a:b=1] WITHIN 2 SECONDS FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]", + ), + ], +) +def test_obs_flatten_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] AND [a:b=2]", + "[a:b=2] AND [a:b=1]", + ), + ( + "[a:b=1] OR [a:b=2]", + "[a:b=2] OR [a:b=1]", + ), + ( + "[a:b=1] OR ([a:b=2] AND [a:b=3])", + "([a:b=3] AND [a:b=2]) OR [a:b=1]", + ), + ( + "[a:b=1] WITHIN 2 SECONDS AND [a:b=2] REPEATS 2 TIMES", + "[a:b=2] REPEATS 2 TIMES AND [a:b=1] WITHIN 2 SECONDS", + ), + ], +) +def test_obs_order_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] FOLLOWEDBY [a:b=2]", + "[a:b=2] FOLLOWEDBY [a:b=1]", + ), + ( + "[a:b=1] WITHIN 2 SECONDS AND [a:b=2] REPEATS 2 TIMES", + "[a:b=1] REPEATS 2 TIMES AND [a:b=2] WITHIN 2 SECONDS", + ), + ], +) +def test_obs_order_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] OR ([a:b=1] AND [a:b=2])", + "[a:b=1]", + ), + ( + "[a:b=1] OR ([a:b=1] FOLLOWEDBY [a:b=2])", + "[a:b=1]", + ), + ( + "([a:b=3] AND [a:b=1]) OR ([a:b=1] AND [a:b=2] AND [a:b=3])", + "[a:b=3] AND [a:b=1]", + ), + ( + "([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=4] FOLLOWEDBY [a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3])", + "[a:b=1] FOLLOWEDBY [a:b=3]", + ), + ( + "([a:b=1] FOLLOWEDBY [a:b=2]) OR (([a:b=1] FOLLOWEDBY [a:b=2]) AND [a:b=3])", + "[a:b=1] FOLLOWEDBY [a:b=2]", + ), + ( + "([a:b=1] AND [a:b=2]) OR (([a:b=1] AND [a:b=2]) FOLLOWEDBY [a:b=3])", + "[a:b=1] AND [a:b=2]", + ), + ], +) +def test_obs_absorb_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "([a:b=1] AND [a:b=2]) OR ([a:b=2] AND [a:b=3] AND [a:b=4])", + "[a:b=1] AND [a:b=2]", + ), + ( + "([a:b=2] FOLLOWEDBY [a:b=1]) OR ([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3])", + "[a:b=2] FOLLOWEDBY [a:b=1]", + ), + ], +) +def test_obs_absorb_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] AND ([a:b=2] OR [a:b=3])", + "([a:b=1] AND [a:b=2]) OR ([a:b=1] AND [a:b=3])", + ), + ( + "[a:b=1] FOLLOWEDBY ([a:b=2] OR [a:b=3])", + "([a:b=1] FOLLOWEDBY [a:b=2]) OR ([a:b=1] FOLLOWEDBY [a:b=3])", + ), + ( + "[a:b=1] AND ([a:b=2] AND ([a:b=3] OR [a:b=4]))", + "([a:b=1] AND [a:b=2] AND [a:b=3]) OR ([a:b=1] AND [a:b=2] AND [a:b=4])", + ), + ( + "[a:b=1] FOLLOWEDBY ([a:b=2] FOLLOWEDBY ([a:b=3] OR [a:b=4]))", + "([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=2] FOLLOWEDBY [a:b=4])", + ), + ( + "([a:b=1] OR [a:b=2]) AND ([a:b=3] OR [a:b=4])", + "([a:b=1] AND [a:b=3]) OR ([a:b=1] AND [a:b=4]) OR ([a:b=2] AND [a:b=3]) OR ([a:b=2] AND [a:b=4])", + ), + ( + "([a:b=1] OR [a:b=2]) FOLLOWEDBY ([a:b=3] OR [a:b=4])", + "([a:b=1] FOLLOWEDBY [a:b=3]) OR ([a:b=1] FOLLOWEDBY [a:b=4]) OR ([a:b=2] FOLLOWEDBY [a:b=3]) OR ([a:b=2] FOLLOWEDBY [a:b=4])", + ), + ], +) +def test_obs_dnf_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] AND [a:b=2]", + "[a:b=1] OR [a:b=2]", + ), + ( + "[a:b=1] AND ([a:b=2] OR [a:b=3])", + "([a:b=1] AND [a:b=2]) OR [a:b=3]", + ), + ( + "[a:b=1] WITHIN 2 SECONDS", + "[a:b=1] REPEATS 2 TIMES", + ), + ], +) +def test_obs_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +# # # # +# # Comparison expression equivalence tests # # +# # # # + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1 AND a:b=1]", + "[a:b=1]", + ), + ( + "[a:b=1 AND a:b=1 AND a:b=1]", + "[a:b=1]", + ), + ( + "[a:b=1 OR a:b=1]", + "[a:b=1]", + ), + ( + "[a:b=1 OR a:b=1 OR a:b=1]", + "[a:b=1]", + ), + ], +) +def test_comp_dupe_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[(a:b=1)]", + "[a:b=1]", + ), + ( + "[(((((a:b=1)))))]", + "[(a:b=1)]", + ), + ( + "[a:b=1 AND (a:b=2 AND a:b=3)]", + "[(a:b=1 AND a:b=2) AND a:b=3]", + ), + ( + "[a:b=1 OR (a:b=2 OR a:b=3)]", + "[(a:b=1 OR a:b=2) OR a:b=3]", + ), + ( + "[(((a:b=1 AND ((a:b=2) AND a:b=3) AND (a:b=4))))]", + "[a:b=1 AND a:b=2 AND a:b=3 AND a:b=4]", + ), + ( + "[(((a:b=1 OR ((a:b=2) OR a:b=3) OR (a:b=4))))]", + "[a:b=1 OR a:b=2 OR a:b=3 OR a:b=4]", + ), + ], +) +def test_comp_flatten_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1 AND a:b=2]", + "[a:b=2 AND a:b=1]", + ), + ( + "[a:b=1 OR a:b=2]", + "[a:b=2 OR a:b=1]", + ), + ( + "[(a:b=1 OR a:b=2) AND a:b=3]", + "[a:b=3 AND (a:b=2 OR a:b=1)]", + ), + ], +) +def test_comp_order_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1 OR (a:b=1 AND a:b=2)]", + "[a:b=1]", + ), + ( + "[a:b=1 AND (a:b=1 OR a:b=2)]", + "[a:b=1]", + ), + ( + "[(a:b=1 AND a:b=2) OR (a:b=3 AND a:b=2 AND a:b=1)]", + "[a:b=1 AND a:b=2]", + ), + ( + "[(a:b=1 OR a:b=2) AND (a:b=3 OR a:b=2 OR a:b=1)]", + "[a:b=1 OR a:b=2]", + ), + ], +) +def test_comp_absorb_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1 OR (a:b=2 AND a:b=3)]", + "[(a:b=1 OR a:b=2) AND (a:b=1 OR a:b=3)]", + ), + ( + "[a:b=1 AND (a:b=2 OR a:b=3)]", + "[(a:b=1 AND a:b=2) OR (a:b=1 AND a:b=3)]", + ), + ( + "[(a:b=1 AND a:b=2) OR (a:b=3 AND a:b=4)]", + "[(a:b=1 OR a:b=3) AND (a:b=1 OR a:b=4) AND (a:b=2 OR a:b=3) AND (a:b=2 OR a:b=4)]", + ), + ( + "[(a:b=1 OR a:b=2) AND (a:b=3 OR a:b=4)]", + "[(a:b=1 AND a:b=3) OR (a:b=1 AND a:b=4) OR (a:b=2 AND a:b=3) OR (a:b=2 AND a:b=4)]", + ), + ( + "[a:b=1 AND (a:b=2 AND (a:b=3 OR a:b=4))]", + "[(a:b=1 AND a:b=2 AND a:b=3) OR (a:b=1 AND a:b=2 AND a:b=4)]", + ), + ], +) +def test_comp_dnf_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1]", + "[a:b=2]", + ), + ( + "[a:b=1 AND a:b=2]", + "[a:b=1 OR a:b=2]", + ), + ( + "[(a:b=1 AND a:b=2) OR a:b=3]", + "[a:b=1 AND (a:b=2 OR a:b=3)]", + ), + ], +) +def test_comp_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[ipv4-addr:value='1.2.3.4/32']", + "[ipv4-addr:value='1.2.3.4']", + ), + ( + "[ipv4-addr:value='1.2.3.4/24']", + "[ipv4-addr:value='1.2.3.0/24']", + ), + ( + "[ipv4-addr:value='1.2.255.4/23']", + "[ipv4-addr:value='1.2.254.0/23']", + ), + ( + "[ipv4-addr:value='1.2.255.4/20']", + "[ipv4-addr:value='1.2.240.0/20']", + ), + ( + "[ipv4-addr:value='1.2.255.4/0']", + "[ipv4-addr:value='0.0.0.0/0']", + ), + ( + "[ipv4-addr:value='01.02.03.04']", + "[ipv4-addr:value='1.2.3.4']", + ), + ( + "[ipv4-addr:value='1.2.3.4/-5']", + "[ipv4-addr:value='1.2.3.4/-5']", + ), + ( + "[ipv4-addr:value='1.2.3.4/99']", + "[ipv4-addr:value='1.2.3.4/99']", + ), + ( + "[ipv4-addr:value='foo']", + "[ipv4-addr:value='foo']", + ), + ], +) +def test_comp_special_canonicalization_ipv4(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[ipv4-addr:value='1.2.3.4']", + "[ipv4-addr:value='1.2.3.5']", + ), + ( + "[ipv4-addr:value='1.2.3.4/1']", + "[ipv4-addr:value='1.2.3.4/2']", + ), + ( + "[ipv4-addr:value='foo']", + "[ipv4-addr:value='bar']", + ), + ], +) +def test_comp_special_canonicalization_ipv4_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/128']", + "[ipv6-addr:value='1:2:3:4:5:6:7:8']", + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/112']", + "[ipv6-addr:value='1:2:3:4:5:6:7:0/112']", + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:ffff:8/111']", + "[ipv6-addr:value='1:2:3:4:5:6:fffe:0/111']", + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:ffff:8/104']", + "[ipv6-addr:value='1:2:3:4:5:6:ff00:0/104']", + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/0']", + "[ipv6-addr:value='0:0:0:0:0:0:0:0/0']", + ), + ( + "[ipv6-addr:value='0001:0000:0000:0000:0000:0000:0000:0001']", + "[ipv6-addr:value='1::1']", + ), + ( + "[ipv6-addr:value='0000:0000:0000:0000:0000:0000:0000:0000']", + "[ipv6-addr:value='::']", + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/-5']", + "[ipv6-addr:value='1:2:3:4:5:6:7:8/-5']", + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/99']", + "[ipv6-addr:value='1:2:3:4:5:6:7:8/99']", + ), + ( + "[ipv6-addr:value='foo']", + "[ipv6-addr:value='foo']", + ), + ], +) +def test_comp_special_canonicalization_ipv6(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8']", + "[ipv6-addr:value='1:2:3:4:5:6:7:9']", + ), + ( + "[ipv6-addr:value='1:2:3:4:5:6:7:8/1']", + "[ipv6-addr:value='1:2:3:4:5:6:7:8/2']", + ), + ( + "[ipv6-addr:value='foo']", + "[ipv6-addr:value='bar']", + ), + ], +) +def test_comp_special_canonicalization_ipv6_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[windows-registry-key:key = 'aaa']", + "[windows-registry-key:key = 'AAA']", + ), + ( + "[windows-registry-key:values[0].name = 'aaa']", + "[windows-registry-key:values[0].name = 'AAA']", + ), + ( + "[windows-registry-key:values[*].name = 'aaa']", + "[windows-registry-key:values[*].name = 'AAA']", + ), + ], +) +def test_comp_special_canonicalization_win_reg_key(patt1, patt2): + assert equivalent_patterns(patt1, patt2) + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[windows-registry-key:key='foo']", + "[windows-registry-key:key='bar']", + ), + ( + "[windows-registry-key:values[0].name='foo']", + "[windows-registry-key:values[0].name='bar']", + ), + ( + "[windows-registry-key:values[*].name='foo']", + "[windows-registry-key:values[*].name='bar']", + ), + ( + "[windows-registry-key:values[*].data='foo']", + "[windows-registry-key:values[*].data='FOO']", + ), + ], +) +def test_comp_special_canonicalization_win_reg_key_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2) + + +def test_comp_other_constant_types(): + constants = [ + "1.23", + "1", + "true", + "false", + "h'4fa2'", + "b'ZmpoZWll'", + "t'1982-12-31T02:14:17.232Z'", + ] + + pattern_template = "[a:b={}]" + for i, const1 in enumerate(constants): + for j, const2 in enumerate(constants): + patt1 = pattern_template.format(const1) + patt2 = pattern_template.format(const2) + + if i == j: + assert equivalent_patterns(patt1, patt2) + else: + assert not equivalent_patterns(patt1, patt2) + + # can't use an "=" pattern with lists... + for const in constants: + patt1 = "[a:b={}]".format(const) + patt2 = "[a:b IN (1,2,3)]" + assert not equivalent_patterns(patt1, patt2) + + +# # # # +# # find_equivalent_patterns() tests # # +# # # # + +def test_find_equivalent_patterns(): + search_pattern = "[a:b=1]" + other_patterns = [ + "[a:b=2]", + "[a:b=1]", + "[a:b=1] WITHIN 1 SECONDS", + "[a:b=1] OR ([a:b=2] AND [a:b=1])", + "[(a:b=2 OR a:b=1) AND a:b=1]", + "[c:d=1]", + "[a:b>1]", + ] + + result = list( + find_equivalent_patterns(search_pattern, other_patterns), + ) + + assert result == [ + "[a:b=1]", + "[a:b=1] OR ([a:b=2] AND [a:b=1])", + "[(a:b=2 OR a:b=1) AND a:b=1]", + ] diff --git a/stix2/test/v20/test_pattern_equivalence.py b/stix2/test/v20/test_pattern_equivalence.py new file mode 100644 index 0000000..1ada5c7 --- /dev/null +++ b/stix2/test/v20/test_pattern_equivalence.py @@ -0,0 +1,47 @@ +""" +Pattern equivalence unit tests which use STIX 2.0-specific pattern features +""" + +import pytest + +from stix2.equivalence.patterns import equivalent_patterns + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", + "[a:b=1 OR (a:c=2 AND a:b=1)] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", + ), + ( + "[a:b=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z' WITHIN 2 SECONDS", + "[a:b=1 OR (a:c=2 AND a:b=1)] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z' WITHIN 2 SECONDS", + ), + ( + "[a:b=1] REPEATS 2 TIMES REPEATS 2 TIMES", + "([a:b=1] REPEATS 2 TIMES) REPEATS 2 TIMES", + ), + ], +) +def test_startstop_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2, stix_version="2.0") + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b!=1] START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", + "[a:b!=1] START '1977-09-29T07:41:03Z' STOP '1996-09-18T22:46:07Z'", + ), + ( + "[a:b<1] REPEATS 2 TIMES START '1993-06-29T15:24:42Z' STOP '2000-07-30T19:29:58Z'", + "[a:b<1] REPEATS 2 TIMES START '1977-09-29T07:41:03Z' STOP '1996-09-18T22:46:07Z'", + ), + ( + "[a:b=1] REPEATS 2 TIMES REPEATS 2 TIMES", + "([a:b=1] REPEATS 2 TIMES) REPEATS 3 TIMES", + ), + ], +) +def test_startstop_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2, stix_version="2.0") diff --git a/stix2/test/v21/test_pattern_equivalence.py b/stix2/test/v21/test_pattern_equivalence.py new file mode 100644 index 0000000..71ded69 --- /dev/null +++ b/stix2/test/v21/test_pattern_equivalence.py @@ -0,0 +1,47 @@ +""" +Pattern equivalence unit tests which use STIX 2.1+-specific pattern features +""" + +import pytest + +from stix2.equivalence.patterns import equivalent_patterns + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", + "[a:b=1 OR (a:c=2 AND a:b=1)] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", + ), + ( + "[a:b=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z' WITHIN 2 SECONDS", + "[a:b=1 OR (a:c=2 AND a:b=1)] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z' WITHIN 2 SECONDS", + ), + ( + "([a:b=1]) REPEATS 2 TIMES REPEATS 2 TIMES", + "([a:b=1] REPEATS 2 TIMES) REPEATS 2 TIMES", + ), + ], +) +def test_startstop_equivalent(patt1, patt2): + assert equivalent_patterns(patt1, patt2, stix_version="2.1") + + +@pytest.mark.parametrize( + "patt1, patt2", [ + ( + "[a:b!=1] START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", + "[a:b!=1] START t'1977-09-29T07:41:03Z' STOP t'1996-09-18T22:46:07Z'", + ), + ( + "[a:b<1] REPEATS 2 TIMES START t'1993-06-29T15:24:42Z' STOP t'2000-07-30T19:29:58Z'", + "[a:b<1] REPEATS 2 TIMES START t'1977-09-29T07:41:03Z' STOP t'1996-09-18T22:46:07Z'", + ), + ( + "([a:b=1]) REPEATS 2 TIMES REPEATS 2 TIMES", + "([a:b=1] REPEATS 2 TIMES) REPEATS 3 TIMES", + ), + ], +) +def test_startstop_not_equivalent(patt1, patt2): + assert not equivalent_patterns(patt1, patt2, stix_version="2.1") diff --git a/stix2/test/v21/test_pattern_expressions.py b/stix2/test/v21/test_pattern_expressions.py index 3ba0aa6..ac6a439 100644 --- a/stix2/test/v21/test_pattern_expressions.py +++ b/stix2/test/v21/test_pattern_expressions.py @@ -658,6 +658,7 @@ def test_parsing_integer_index(): patt_obj = create_pattern_object("[a:b[1]=2]") assert str(patt_obj) == "[a:b[1] = 2]" + # This should never occur, because the first component will always be a property_name, and they should not be quoted. def test_parsing_quoted_first_path_component(): patt_obj = create_pattern_object("[a:'b'[1]=2]")