2020-08-11 00:33:26 +02:00
|
|
|
"""
|
|
|
|
Transformation utilities for STIX pattern comparison expressions.
|
|
|
|
"""
|
|
|
|
import functools
|
|
|
|
import itertools
|
2020-08-13 23:44:42 +02:00
|
|
|
|
Graph Equivalence (#449)
* new packages for graph and object-based semantic equivalence
* new method graphically_equivalent for Environment, move equivalence methods out
* object equivalence function, methods used for object-based moved here.
* new graph_equivalence methods
* add notes
* add support for versioning checks (default disabled)
* new tests to cover graph equivalence and new methods
* added more imports to environment.py to prevent breaking changes
* variable changes, new fields for checks, reset depth check per call
* flexibility when object is not available on graph.
* refactor debug logging message
* new file stix2.equivalence.graph_equivalence.rst and stix2.equivalence.object_equivalence.rst for docs
* API documentation for new modules
* additional text required to build docs
* add more test methods for list_semantic_check an graphically_equivalent/versioning
* add logging debug messages, code clean-up
* include individual scoring on results dict, fix issue on list_semantic_check not keeping highest score
* include results as summary in prop_scores, minor tweaks
* Update __init__.py
doctrings update
* apply feedback from pull request
- rename semantic_check to reference_check
- rename modules to graph and object respectively to eliminate redundancy
- remove created_by_ref and object_marking_refs from graph WEIGHTS and rebalance
* update docs/ entries
* add more checks, make max score based on actual objects checked instead of the full list, only create entry when type is present in WEIGHTS dictionary
update tests to reflect changes
* rename package patterns -> pattern
* documentation, moving weights around
* more documentation moving
* rename WEIGHTS variable for graph_equivalence
2020-10-16 17:35:26 +02:00
|
|
|
from stix2.equivalence.pattern.compare import iter_in, iter_lex_cmp
|
|
|
|
from stix2.equivalence.pattern.compare.comparison import (
|
2020-08-13 23:44:42 +02:00
|
|
|
comparison_expression_cmp,
|
|
|
|
)
|
Graph Equivalence (#449)
* new packages for graph and object-based semantic equivalence
* new method graphically_equivalent for Environment, move equivalence methods out
* object equivalence function, methods used for object-based moved here.
* new graph_equivalence methods
* add notes
* add support for versioning checks (default disabled)
* new tests to cover graph equivalence and new methods
* added more imports to environment.py to prevent breaking changes
* variable changes, new fields for checks, reset depth check per call
* flexibility when object is not available on graph.
* refactor debug logging message
* new file stix2.equivalence.graph_equivalence.rst and stix2.equivalence.object_equivalence.rst for docs
* API documentation for new modules
* additional text required to build docs
* add more test methods for list_semantic_check an graphically_equivalent/versioning
* add logging debug messages, code clean-up
* include individual scoring on results dict, fix issue on list_semantic_check not keeping highest score
* include results as summary in prop_scores, minor tweaks
* Update __init__.py
doctrings update
* apply feedback from pull request
- rename semantic_check to reference_check
- rename modules to graph and object respectively to eliminate redundancy
- remove created_by_ref and object_marking_refs from graph WEIGHTS and rebalance
* update docs/ entries
* add more checks, make max score based on actual objects checked instead of the full list, only create entry when type is present in WEIGHTS dictionary
update tests to reflect changes
* rename package patterns -> pattern
* documentation, moving weights around
* more documentation moving
* rename WEIGHTS variable for graph_equivalence
2020-10-16 17:35:26 +02:00
|
|
|
from stix2.equivalence.pattern.transform import Transformer
|
|
|
|
from stix2.equivalence.pattern.transform.specials import (
|
2020-08-13 23:44:42 +02:00
|
|
|
ipv4_addr, ipv6_addr, windows_reg_key,
|
2020-08-13 01:28:35 +02:00
|
|
|
)
|
2020-08-11 00:33:26 +02:00
|
|
|
from stix2.patterns import (
|
2020-08-13 23:44:42 +02:00
|
|
|
AndBooleanExpression, OrBooleanExpression, ParentheticalExpression,
|
|
|
|
_BooleanExpression, _ComparisonExpression,
|
2020-08-11 00:33:26 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def _dupe_ast(ast):
|
|
|
|
"""
|
|
|
|
Create a duplicate of the given AST.
|
|
|
|
|
2020-11-20 21:59:55 +01:00
|
|
|
Note:
|
|
|
|
The comparison expression "leaves", i.e. simple <path> <op> <value>
|
|
|
|
comparisons are currently not duplicated. I don't think it's necessary
|
|
|
|
as of this writing; they are never changed. But revisit this if/when
|
|
|
|
necessary.
|
2020-08-11 00:33:26 +02:00
|
|
|
|
2020-11-20 21:59:55 +01:00
|
|
|
Args:
|
|
|
|
ast: The AST to duplicate
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The duplicate AST
|
2020-08-11 00:33:26 +02:00
|
|
|
"""
|
|
|
|
if isinstance(ast, AndBooleanExpression):
|
|
|
|
result = AndBooleanExpression([
|
|
|
|
_dupe_ast(operand) for operand in ast.operands
|
|
|
|
])
|
|
|
|
|
|
|
|
elif isinstance(ast, OrBooleanExpression):
|
|
|
|
result = OrBooleanExpression([
|
|
|
|
_dupe_ast(operand) for operand in ast.operands
|
|
|
|
])
|
|
|
|
|
|
|
|
elif isinstance(ast, _ComparisonExpression):
|
|
|
|
# Change this to create a dupe, if we ever need to change simple
|
2021-02-12 01:33:57 +01:00
|
|
|
# comparison expressions as part of normalization.
|
2020-08-11 00:33:26 +02:00
|
|
|
result = ast
|
|
|
|
|
|
|
|
else:
|
|
|
|
raise TypeError("Can't duplicate " + type(ast).__name__)
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
class ComparisonExpressionTransformer(Transformer):
|
|
|
|
"""
|
|
|
|
Transformer base class with special support for transforming comparison
|
|
|
|
expressions. The transform method implemented here performs a bottom-up
|
|
|
|
in-place transformation, with support for some comparison
|
|
|
|
expression-specific callbacks.
|
|
|
|
|
|
|
|
Specifically, subclasses can implement methods:
|
|
|
|
"transform_or" for OR nodes
|
|
|
|
"transform_and" for AND nodes
|
2020-08-13 01:28:35 +02:00
|
|
|
"transform_comparison" for plain comparison nodes (<prop> <op> <value>)
|
2020-08-11 00:33:26 +02:00
|
|
|
"transform_default" for both types of nodes
|
|
|
|
|
|
|
|
"transform_default" is a fallback, if a type-specific callback is not
|
|
|
|
found. The default implementation does nothing to the AST. The
|
|
|
|
type-specific callbacks are preferred over the default, if both exist.
|
|
|
|
|
|
|
|
In all cases, the callbacks are called with an AST for a subtree rooted at
|
|
|
|
the appropriate node type, where the subtree's children have already been
|
|
|
|
transformed. They must return the same thing as the base transform()
|
|
|
|
method: a 2-tuple with the transformed AST and a boolean for change
|
|
|
|
detection. See doc for the superclass' method.
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
This process currently silently drops parenthetical nodes.
|
2020-08-11 00:33:26 +02:00
|
|
|
"""
|
|
|
|
|
|
|
|
def transform(self, ast):
|
|
|
|
if isinstance(ast, _BooleanExpression):
|
|
|
|
changed = False
|
|
|
|
for i, operand in enumerate(ast.operands):
|
|
|
|
operand_result, this_changed = self.transform(operand)
|
|
|
|
if this_changed:
|
|
|
|
changed = True
|
|
|
|
|
|
|
|
ast.operands[i] = operand_result
|
|
|
|
|
|
|
|
result, this_changed = self.__dispatch_transform(ast)
|
|
|
|
if this_changed:
|
|
|
|
changed = True
|
|
|
|
|
|
|
|
elif isinstance(ast, _ComparisonExpression):
|
2020-08-13 01:28:35 +02:00
|
|
|
result, changed = self.__dispatch_transform(ast)
|
2020-08-11 00:33:26 +02:00
|
|
|
|
|
|
|
elif isinstance(ast, ParentheticalExpression):
|
|
|
|
# Drop these
|
|
|
|
result, changed = self.transform(ast.expression)
|
|
|
|
|
|
|
|
else:
|
|
|
|
raise TypeError("Not a comparison expression: " + str(ast))
|
|
|
|
|
|
|
|
return result, changed
|
|
|
|
|
|
|
|
def __dispatch_transform(self, ast):
|
|
|
|
"""
|
|
|
|
Invoke a transformer callback method based on the given ast root node
|
|
|
|
type.
|
|
|
|
|
2020-11-20 21:59:55 +01:00
|
|
|
Args:
|
|
|
|
ast: The AST
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The callback's result
|
2020-08-11 00:33:26 +02:00
|
|
|
"""
|
|
|
|
|
|
|
|
if isinstance(ast, AndBooleanExpression):
|
|
|
|
meth = getattr(self, "transform_and", self.transform_default)
|
|
|
|
|
|
|
|
elif isinstance(ast, OrBooleanExpression):
|
|
|
|
meth = getattr(self, "transform_or", self.transform_default)
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
elif isinstance(ast, _ComparisonExpression):
|
|
|
|
meth = getattr(
|
2020-08-13 23:44:42 +02:00
|
|
|
self, "transform_comparison", self.transform_default,
|
2020-08-13 01:28:35 +02:00
|
|
|
)
|
|
|
|
|
2020-08-11 00:33:26 +02:00
|
|
|
else:
|
|
|
|
meth = self.transform_default
|
|
|
|
|
|
|
|
return meth(ast)
|
|
|
|
|
|
|
|
def transform_default(self, ast):
|
|
|
|
"""
|
|
|
|
Override to handle transforming AST nodes which don't have a more
|
|
|
|
specific method implemented.
|
|
|
|
"""
|
|
|
|
return ast, False
|
|
|
|
|
|
|
|
|
|
|
|
class OrderDedupeTransformer(
|
2021-01-13 23:52:15 +01:00
|
|
|
ComparisonExpressionTransformer,
|
2020-08-11 00:33:26 +02:00
|
|
|
):
|
|
|
|
"""
|
2021-02-15 01:25:59 +01:00
|
|
|
Order the children of all nodes in the AST. Because the deduping algorithm
|
|
|
|
is based on sorted data, this transformation also does deduping.
|
2020-08-11 00:33:26 +02:00
|
|
|
|
|
|
|
E.g.:
|
|
|
|
A and A => A
|
|
|
|
A or A => A
|
|
|
|
"""
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
def __transform(self, ast):
|
2020-08-11 00:33:26 +02:00
|
|
|
"""
|
|
|
|
Sort/dedupe children. AND and OR can be treated identically.
|
|
|
|
|
2020-11-20 21:59:55 +01:00
|
|
|
Args:
|
|
|
|
ast: The comparison expression AST
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The same AST node, but with sorted children
|
2020-08-11 00:33:26 +02:00
|
|
|
"""
|
|
|
|
sorted_children = sorted(
|
2020-08-13 23:44:42 +02:00
|
|
|
ast.operands, key=functools.cmp_to_key(comparison_expression_cmp),
|
2020-08-11 00:33:26 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
deduped_children = [
|
|
|
|
# Apparently when using a key function, groupby()'s "keys" are the
|
|
|
|
# key wrappers, not actual sequence values. Obviously we don't
|
|
|
|
# need key wrappers in our ASTs!
|
|
|
|
k.obj for k, _ in itertools.groupby(
|
|
|
|
sorted_children, key=functools.cmp_to_key(
|
2020-08-13 23:44:42 +02:00
|
|
|
comparison_expression_cmp,
|
|
|
|
),
|
2020-08-11 00:33:26 +02:00
|
|
|
)
|
|
|
|
]
|
|
|
|
|
|
|
|
changed = iter_lex_cmp(
|
2020-08-13 23:44:42 +02:00
|
|
|
ast.operands, deduped_children, comparison_expression_cmp,
|
2020-08-11 00:33:26 +02:00
|
|
|
) != 0
|
|
|
|
|
|
|
|
ast.operands = deduped_children
|
|
|
|
|
|
|
|
return ast, changed
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
def transform_or(self, ast):
|
|
|
|
return self.__transform(ast)
|
|
|
|
|
|
|
|
def transform_and(self, ast):
|
|
|
|
return self.__transform(ast)
|
|
|
|
|
2020-08-11 00:33:26 +02:00
|
|
|
|
|
|
|
class FlattenTransformer(ComparisonExpressionTransformer):
|
|
|
|
"""
|
|
|
|
Flatten all nodes of the AST. E.g.:
|
|
|
|
|
|
|
|
A and (B and C) => A and B and C
|
|
|
|
A or (B or C) => A or B or C
|
|
|
|
(A) => A
|
|
|
|
"""
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
def __transform(self, ast):
|
2020-08-11 00:33:26 +02:00
|
|
|
"""
|
|
|
|
Flatten children. AND and OR can be treated mostly identically. The
|
|
|
|
little difference is that we can absorb AND children if we're an AND
|
|
|
|
ourselves; and OR for OR.
|
|
|
|
|
2020-11-20 21:59:55 +01:00
|
|
|
Args:
|
|
|
|
ast: The comparison expression AST
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The same AST node, but with flattened children
|
2020-08-11 00:33:26 +02:00
|
|
|
"""
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
changed = False
|
|
|
|
if len(ast.operands) == 1:
|
2020-08-11 00:33:26 +02:00
|
|
|
# Replace an AND/OR with one child, with the child itself.
|
|
|
|
ast = ast.operands[0]
|
|
|
|
changed = True
|
|
|
|
|
|
|
|
else:
|
|
|
|
flat_operands = []
|
|
|
|
for operand in ast.operands:
|
|
|
|
if isinstance(operand, _BooleanExpression) \
|
|
|
|
and ast.operator == operand.operator:
|
|
|
|
flat_operands.extend(operand.operands)
|
|
|
|
changed = True
|
|
|
|
|
|
|
|
else:
|
|
|
|
flat_operands.append(operand)
|
|
|
|
|
|
|
|
ast.operands = flat_operands
|
|
|
|
|
|
|
|
return ast, changed
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
def transform_or(self, ast):
|
|
|
|
return self.__transform(ast)
|
|
|
|
|
|
|
|
def transform_and(self, ast):
|
|
|
|
return self.__transform(ast)
|
|
|
|
|
2020-08-11 00:33:26 +02:00
|
|
|
|
|
|
|
class AbsorptionTransformer(
|
2021-01-13 23:52:15 +01:00
|
|
|
ComparisonExpressionTransformer,
|
2020-08-11 00:33:26 +02:00
|
|
|
):
|
|
|
|
"""
|
|
|
|
Applies boolean "absorption" rules for AST simplification. E.g.:
|
|
|
|
|
|
|
|
A and (A or B) = A
|
|
|
|
A or (A and B) = A
|
|
|
|
"""
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
def __transform(self, ast):
|
2020-08-11 00:33:26 +02:00
|
|
|
|
|
|
|
changed = False
|
2020-08-13 01:28:35 +02:00
|
|
|
secondary_op = "AND" if ast.operator == "OR" else "OR"
|
|
|
|
|
|
|
|
to_delete = set()
|
2020-08-11 00:33:26 +02:00
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
# Check i (child1) against j to see if we can delete j.
|
|
|
|
for i, child1 in enumerate(ast.operands):
|
|
|
|
if i in to_delete:
|
|
|
|
continue
|
2020-08-11 00:33:26 +02:00
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
for j, child2 in enumerate(ast.operands):
|
|
|
|
if i == j or j in to_delete:
|
2020-08-11 00:33:26 +02:00
|
|
|
continue
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
# We're checking if child1 is contained in child2, so
|
|
|
|
# child2 has to be a compound object, not just a simple
|
|
|
|
# comparison expression. We also require the right operator
|
|
|
|
# for child2: "AND" if ast is "OR" and vice versa.
|
|
|
|
if not isinstance(child2, _BooleanExpression) \
|
|
|
|
or child2.operator != secondary_op:
|
|
|
|
continue
|
|
|
|
|
|
|
|
# The simple check: is child1 contained in child2?
|
|
|
|
if iter_in(
|
2020-08-13 23:44:42 +02:00
|
|
|
child1, child2.operands, comparison_expression_cmp,
|
2020-08-13 01:28:35 +02:00
|
|
|
):
|
|
|
|
to_delete.add(j)
|
|
|
|
|
|
|
|
# A more complicated check: does child1 occur in child2
|
|
|
|
# in a "flattened" form?
|
|
|
|
elif child1.operator == child2.operator:
|
|
|
|
if all(
|
|
|
|
iter_in(
|
|
|
|
child1_operand, child2.operands,
|
2020-08-13 23:44:42 +02:00
|
|
|
comparison_expression_cmp,
|
2020-08-13 01:28:35 +02:00
|
|
|
)
|
|
|
|
for child1_operand in child1.operands
|
2020-08-11 00:33:26 +02:00
|
|
|
):
|
|
|
|
to_delete.add(j)
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
if to_delete:
|
|
|
|
changed = True
|
2020-08-11 00:33:26 +02:00
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
for i in reversed(sorted(to_delete)):
|
|
|
|
del ast.operands[i]
|
2020-08-11 00:33:26 +02:00
|
|
|
|
|
|
|
return ast, changed
|
|
|
|
|
2020-08-13 01:28:35 +02:00
|
|
|
def transform_or(self, ast):
|
|
|
|
return self.__transform(ast)
|
|
|
|
|
|
|
|
def transform_and(self, ast):
|
|
|
|
return self.__transform(ast)
|
|
|
|
|
2020-08-11 00:33:26 +02:00
|
|
|
|
|
|
|
class DNFTransformer(ComparisonExpressionTransformer):
|
|
|
|
"""
|
|
|
|
Convert a comparison expression AST to DNF. E.g.:
|
|
|
|
|
|
|
|
A and (B or C) => (A and B) or (A and C)
|
|
|
|
"""
|
|
|
|
def transform_and(self, ast):
|
|
|
|
or_children = []
|
|
|
|
other_children = []
|
|
|
|
changed = False
|
|
|
|
|
|
|
|
# Sort AND children into two piles: the ORs and everything else
|
|
|
|
for child in ast.operands:
|
|
|
|
if isinstance(child, _BooleanExpression) and child.operator == "OR":
|
|
|
|
# Need a list of operand lists, so we can compute the
|
|
|
|
# product below.
|
|
|
|
or_children.append(child.operands)
|
|
|
|
else:
|
|
|
|
other_children.append(child)
|
|
|
|
|
|
|
|
if or_children:
|
|
|
|
distributed_children = [
|
|
|
|
AndBooleanExpression([
|
|
|
|
# Make dupes: distribution implies adding repetition, and
|
|
|
|
# we should ensure each repetition is independent of the
|
|
|
|
# others.
|
|
|
|
_dupe_ast(sub_ast) for sub_ast in itertools.chain(
|
2020-08-13 23:44:42 +02:00
|
|
|
other_children, prod_seq,
|
2020-08-11 00:33:26 +02:00
|
|
|
)
|
|
|
|
])
|
|
|
|
for prod_seq in itertools.product(*or_children)
|
|
|
|
]
|
|
|
|
|
|
|
|
# Need to recursively continue to distribute AND over OR in
|
|
|
|
# any of our new sub-expressions which need it. This causes
|
|
|
|
# more downward recursion in the midst of this bottom-up transform.
|
|
|
|
# It's not good for performance. I wonder if a top-down
|
|
|
|
# transformation algorithm would make more sense in this phase?
|
|
|
|
# But then we'd be using two different algorithms for the same
|
|
|
|
# thing... Maybe this transform should be completely top-down
|
|
|
|
# (no bottom-up component at all)?
|
|
|
|
distributed_children = [
|
|
|
|
self.transform(child)[0] for child in distributed_children
|
|
|
|
]
|
|
|
|
|
|
|
|
result = OrBooleanExpression(distributed_children)
|
|
|
|
changed = True
|
|
|
|
|
|
|
|
else:
|
|
|
|
# No AND-over-OR; nothing to do
|
|
|
|
result = ast
|
|
|
|
|
|
|
|
return result, changed
|
2020-08-13 01:28:35 +02:00
|
|
|
|
|
|
|
|
|
|
|
class SpecialValueCanonicalization(ComparisonExpressionTransformer):
|
|
|
|
"""
|
|
|
|
Try to find particular leaf-node comparison expressions whose rhs (i.e. the
|
|
|
|
constant) can be canonicalized. This is an idiosyncratic transformation
|
|
|
|
based on some ideas people had for context-sensitive semantic equivalence
|
|
|
|
in constant values.
|
|
|
|
"""
|
|
|
|
def transform_comparison(self, ast):
|
|
|
|
if ast.lhs.object_type_name == "windows-registry-key":
|
|
|
|
windows_reg_key(ast)
|
|
|
|
|
|
|
|
elif ast.lhs.object_type_name == "ipv4-addr":
|
|
|
|
ipv4_addr(ast)
|
|
|
|
|
|
|
|
elif ast.lhs.object_type_name == "ipv6-addr":
|
|
|
|
ipv6_addr(ast)
|
|
|
|
|
|
|
|
# Hard-code False here since this particular canonicalization is never
|
|
|
|
# worth doing more than once. I think it's okay to pretend nothing has
|
|
|
|
# changed.
|
|
|
|
return ast, False
|