496 lines
16 KiB
Python
496 lines
16 KiB
Python
"""
|
|
Transformation utilities for STIX pattern observation expressions.
|
|
"""
|
|
import functools
|
|
import itertools
|
|
|
|
from stix2.equivalence.pattern.compare import iter_in, iter_lex_cmp
|
|
from stix2.equivalence.pattern.compare.observation import (
|
|
observation_expression_cmp,
|
|
)
|
|
from stix2.equivalence.pattern.transform import (
|
|
ChainTransformer, SettleTransformer, Transformer,
|
|
)
|
|
from stix2.equivalence.pattern.transform.comparison import (
|
|
SpecialValueCanonicalization,
|
|
)
|
|
from stix2.equivalence.pattern.transform.comparison import \
|
|
AbsorptionTransformer as CAbsorptionTransformer
|
|
from stix2.equivalence.pattern.transform.comparison import \
|
|
DNFTransformer as CDNFTransformer
|
|
from stix2.equivalence.pattern.transform.comparison import \
|
|
FlattenTransformer as CFlattenTransformer
|
|
from stix2.equivalence.pattern.transform.comparison import \
|
|
OrderDedupeTransformer as COrderDedupeTransformer
|
|
from stix2.patterns import (
|
|
AndObservationExpression, FollowedByObservationExpression,
|
|
ObservationExpression, OrObservationExpression, ParentheticalExpression,
|
|
QualifiedObservationExpression, _CompoundObservationExpression,
|
|
)
|
|
|
|
|
|
def _dupe_ast(ast):
|
|
"""
|
|
Create a duplicate of the given AST. The AST root must be an observation
|
|
expression of some kind (AND/OR/qualified, etc).
|
|
|
|
Note: the observation expression "leaves", i.e. simple square-bracket
|
|
observation expressions are currently not duplicated. I don't think it's
|
|
necessary as of this writing. But revisit this if/when necessary.
|
|
|
|
:param ast: The AST to duplicate
|
|
:return: The duplicate AST
|
|
"""
|
|
if isinstance(ast, AndObservationExpression):
|
|
result = AndObservationExpression([
|
|
_dupe_ast(child) for child in ast.operands
|
|
])
|
|
|
|
elif isinstance(ast, OrObservationExpression):
|
|
result = OrObservationExpression([
|
|
_dupe_ast(child) for child in ast.operands
|
|
])
|
|
|
|
elif isinstance(ast, FollowedByObservationExpression):
|
|
result = FollowedByObservationExpression([
|
|
_dupe_ast(child) for child in ast.operands
|
|
])
|
|
|
|
elif isinstance(ast, QualifiedObservationExpression):
|
|
# Don't need to dupe the qualifier object at this point
|
|
result = QualifiedObservationExpression(
|
|
_dupe_ast(ast.observation_expression), ast.qualifier,
|
|
)
|
|
|
|
elif isinstance(ast, ObservationExpression):
|
|
result = ast
|
|
|
|
else:
|
|
raise TypeError("Can't duplicate " + type(ast).__name__)
|
|
|
|
return result
|
|
|
|
|
|
class ObservationExpressionTransformer(Transformer):
|
|
"""
|
|
Transformer base class with special support for transforming observation
|
|
expressions. The transform method implemented here performs a bottom-up
|
|
in-place transformation, with support for some observation
|
|
expression-specific callbacks. It recurses down as far as the "leaf node"
|
|
observation expressions; it does not go inside of them, to the individual
|
|
components of a comparison expression.
|
|
|
|
Specifically, subclasses can implement methods:
|
|
"transform_or" for OR nodes
|
|
"transform_and" for AND nodes
|
|
"transform_followedby" for FOLLOWEDBY nodes
|
|
"transform_qualified" for qualified nodes (all qualifier types)
|
|
"transform_observation" for "leaf" observation expression nodes
|
|
"transform_default" for all types of nodes
|
|
|
|
"transform_default" is a fallback, if a type-specific callback is not
|
|
found. The default implementation does nothing to the AST. The
|
|
type-specific callbacks are preferred over the default, if both exist.
|
|
|
|
In all cases, the callbacks are called with an AST for a subtree rooted at
|
|
the appropriate node type, where the AST's children have already been
|
|
transformed. They must return the same thing as the base transform()
|
|
method: a 2-tuple with the transformed AST and a boolean for change
|
|
detection. See doc for the superclass' method.
|
|
|
|
This process currently silently drops parenthetical nodes.
|
|
"""
|
|
|
|
# Determines how AST node types map to callback method names
|
|
_DISPATCH_NAME_MAP = {
|
|
ObservationExpression: "observation",
|
|
AndObservationExpression: "and",
|
|
OrObservationExpression: "or",
|
|
FollowedByObservationExpression: "followedby",
|
|
QualifiedObservationExpression: "qualified",
|
|
}
|
|
|
|
def transform(self, ast):
|
|
|
|
changed = False
|
|
if isinstance(ast, ObservationExpression):
|
|
# A "leaf node" for observation expressions. We don't recurse into
|
|
# these.
|
|
result, this_changed = self.__dispatch_transform(ast)
|
|
if this_changed:
|
|
changed = True
|
|
|
|
elif isinstance(ast, _CompoundObservationExpression):
|
|
for i, operand in enumerate(ast.operands):
|
|
result, this_changed = self.transform(operand)
|
|
if this_changed:
|
|
ast.operands[i] = result
|
|
changed = True
|
|
|
|
result, this_changed = self.__dispatch_transform(ast)
|
|
if this_changed:
|
|
changed = True
|
|
|
|
elif isinstance(ast, QualifiedObservationExpression):
|
|
# I don't think we need to process/transform the qualifier by
|
|
# itself, do we?
|
|
result, this_changed = self.transform(ast.observation_expression)
|
|
if this_changed:
|
|
ast.observation_expression = result
|
|
changed = True
|
|
|
|
result, this_changed = self.__dispatch_transform(ast)
|
|
if this_changed:
|
|
changed = True
|
|
|
|
elif isinstance(ast, ParentheticalExpression):
|
|
result, _ = self.transform(ast.expression)
|
|
# Dropping a node is a change, right?
|
|
changed = True
|
|
|
|
else:
|
|
raise TypeError("Not an observation expression: {}: {}".format(
|
|
type(ast).__name__, str(ast),
|
|
))
|
|
|
|
return result, changed
|
|
|
|
def __dispatch_transform(self, ast):
|
|
"""
|
|
Invoke a transformer callback method based on the given ast root node
|
|
type.
|
|
|
|
:param ast: The AST
|
|
:return: The callback's result
|
|
"""
|
|
|
|
dispatch_name = self._DISPATCH_NAME_MAP.get(type(ast))
|
|
if dispatch_name:
|
|
meth_name = "transform_" + dispatch_name
|
|
meth = getattr(self, meth_name, self.transform_default)
|
|
else:
|
|
meth = self.transform_default
|
|
|
|
return meth(ast)
|
|
|
|
def transform_default(self, ast):
|
|
return ast, False
|
|
|
|
|
|
class FlattenTransformer(ObservationExpressionTransformer):
|
|
"""
|
|
Flatten an observation expression AST. E.g.:
|
|
|
|
A and (B and C) => A and B and C
|
|
A or (B or C) => A or B or C
|
|
A followedby (B followedby C) => A followedby B followedby C
|
|
(A) => A
|
|
"""
|
|
|
|
def __transform(self, ast):
|
|
|
|
changed = False
|
|
|
|
if len(ast.operands) == 1:
|
|
# Replace an AND/OR/FOLLOWEDBY with one child, with the child
|
|
# itself.
|
|
result = ast.operands[0]
|
|
changed = True
|
|
|
|
else:
|
|
flat_children = []
|
|
for operand in ast.operands:
|
|
if isinstance(operand, _CompoundObservationExpression) \
|
|
and ast.operator == operand.operator:
|
|
flat_children.extend(operand.operands)
|
|
changed = True
|
|
else:
|
|
flat_children.append(operand)
|
|
|
|
ast.operands = flat_children
|
|
result = ast
|
|
|
|
return result, changed
|
|
|
|
def transform_and(self, ast):
|
|
return self.__transform(ast)
|
|
|
|
def transform_or(self, ast):
|
|
return self.__transform(ast)
|
|
|
|
def transform_followedby(self, ast):
|
|
return self.__transform(ast)
|
|
|
|
|
|
class OrderDedupeTransformer(
|
|
ObservationExpressionTransformer
|
|
):
|
|
"""
|
|
Canonically order AND/OR expressions, and dedupe ORs. E.g.:
|
|
|
|
A or A => A
|
|
B or A => A or B
|
|
B and A => A and B
|
|
"""
|
|
|
|
def __transform(self, ast):
|
|
sorted_children = sorted(
|
|
ast.operands, key=functools.cmp_to_key(observation_expression_cmp),
|
|
)
|
|
|
|
# Deduping only applies to ORs
|
|
if ast.operator == "OR":
|
|
deduped_children = [
|
|
key.obj for key, _ in itertools.groupby(
|
|
sorted_children, key=functools.cmp_to_key(
|
|
observation_expression_cmp,
|
|
),
|
|
)
|
|
]
|
|
else:
|
|
deduped_children = sorted_children
|
|
|
|
changed = iter_lex_cmp(
|
|
ast.operands, deduped_children, observation_expression_cmp,
|
|
) != 0
|
|
|
|
ast.operands = deduped_children
|
|
|
|
return ast, changed
|
|
|
|
def transform_and(self, ast):
|
|
return self.__transform(ast)
|
|
|
|
def transform_or(self, ast):
|
|
return self.__transform(ast)
|
|
|
|
|
|
class AbsorptionTransformer(
|
|
ObservationExpressionTransformer
|
|
):
|
|
"""
|
|
Applies boolean "absorption" rules for observation expressions, for AST
|
|
simplification:
|
|
|
|
A or (A and B) = A
|
|
A or (A followedby B) = A
|
|
|
|
Other variants do not hold for observation expressions.
|
|
"""
|
|
|
|
def __is_contained_and(self, exprs_containee, exprs_container):
|
|
"""
|
|
Determine whether the "containee" expressions are contained in the
|
|
"container" expressions, with AND semantics (order-independent but need
|
|
distinct bindings). For example (with containee on left and container
|
|
on right):
|
|
|
|
(A and A and B) or (A and B and C)
|
|
|
|
In the above, all of the lhs vars have a counterpart in the rhs, but
|
|
there are two A's on the left and only one on the right. Therefore,
|
|
the right does not "contain" the left. You would need two A's on the
|
|
right.
|
|
|
|
:param exprs_containee: The expressions we want to check for containment
|
|
:param exprs_container: The expressions acting as the "container"
|
|
:return: True if the containee is contained in the container; False if
|
|
not
|
|
"""
|
|
|
|
# make our own list we are free to manipulate without affecting the
|
|
# function args.
|
|
container = list(exprs_container)
|
|
|
|
result = True
|
|
for ee in exprs_containee:
|
|
for i, er in enumerate(container):
|
|
if observation_expression_cmp(ee, er) == 0:
|
|
# Found a match in the container; delete it so we never try
|
|
# to match a container expr to two different containee
|
|
# expressions.
|
|
del container[i]
|
|
break
|
|
else:
|
|
result = False
|
|
break
|
|
|
|
return result
|
|
|
|
def __is_contained_followedby(self, exprs_containee, exprs_container):
|
|
"""
|
|
Determine whether the "containee" expressions are contained in the
|
|
"container" expressions, with FOLLOWEDBY semantics (order-sensitive and
|
|
need distinct bindings). For example (with containee on left and
|
|
container on right):
|
|
|
|
(A followedby B) or (B followedby A)
|
|
|
|
In the above, all of the lhs vars have a counterpart in the rhs, but
|
|
the vars on the right are not in the same order. Therefore, the right
|
|
does not "contain" the left. The container vars don't have to be
|
|
contiguous though. E.g. in:
|
|
|
|
(A followedby B) or (D followedby A followedby C followedby B)
|
|
|
|
in the container (rhs), B follows A, so it "contains" the lhs even
|
|
though there is other stuff mixed in.
|
|
|
|
:param exprs_containee: The expressions we want to check for containment
|
|
:param exprs_container: The expressions acting as the "container"
|
|
:return: True if the containee is contained in the container; False if
|
|
not
|
|
"""
|
|
|
|
ee_iter = iter(exprs_containee)
|
|
er_iter = iter(exprs_container)
|
|
|
|
result = True
|
|
while True:
|
|
ee = next(ee_iter, None)
|
|
if not ee:
|
|
break
|
|
|
|
while True:
|
|
er = next(er_iter, None)
|
|
if er:
|
|
if observation_expression_cmp(ee, er) == 0:
|
|
break
|
|
else:
|
|
break
|
|
|
|
if not er:
|
|
result = False
|
|
break
|
|
|
|
return result
|
|
|
|
def transform_or(self, ast):
|
|
changed = False
|
|
to_delete = set()
|
|
for i, child1 in enumerate(ast.operands):
|
|
if i in to_delete:
|
|
continue
|
|
|
|
# The simplification doesn't work across qualifiers
|
|
if isinstance(child1, QualifiedObservationExpression):
|
|
continue
|
|
|
|
for j, child2 in enumerate(ast.operands):
|
|
if i == j or j in to_delete:
|
|
continue
|
|
|
|
if isinstance(
|
|
child2, (
|
|
AndObservationExpression,
|
|
FollowedByObservationExpression,
|
|
),
|
|
):
|
|
# The simple check: is child1 contained in child2?
|
|
if iter_in(
|
|
child1, child2.operands, observation_expression_cmp,
|
|
):
|
|
to_delete.add(j)
|
|
|
|
# A more complicated check: does child1 occur in child2
|
|
# in a "flattened" form?
|
|
elif type(child1) is type(child2):
|
|
if isinstance(child1, AndObservationExpression):
|
|
can_simplify = self.__is_contained_and(
|
|
child1.operands, child2.operands,
|
|
)
|
|
else: # child1 and 2 are followedby nodes
|
|
can_simplify = self.__is_contained_followedby(
|
|
child1.operands, child2.operands,
|
|
)
|
|
|
|
if can_simplify:
|
|
to_delete.add(j)
|
|
|
|
if to_delete:
|
|
changed = True
|
|
|
|
for i in reversed(sorted(to_delete)):
|
|
del ast.operands[i]
|
|
|
|
return ast, changed
|
|
|
|
|
|
class DNFTransformer(ObservationExpressionTransformer):
|
|
"""
|
|
Transform an observation expression to DNF. This will distribute AND and
|
|
FOLLOWEDBY over OR:
|
|
|
|
A and (B or C) => (A and B) or (A and C)
|
|
A followedby (B or C) => (A followedby B) or (A followedby C)
|
|
"""
|
|
|
|
def __transform(self, ast):
|
|
|
|
root_type = type(ast) # will be AST class for AND or FOLLOWEDBY
|
|
changed = False
|
|
or_children = []
|
|
other_children = []
|
|
for child in ast.operands:
|
|
if isinstance(child, OrObservationExpression):
|
|
or_children.append(child.operands)
|
|
else:
|
|
other_children.append(child)
|
|
|
|
if or_children:
|
|
distributed_children = [
|
|
root_type([
|
|
_dupe_ast(sub_ast) for sub_ast in itertools.chain(
|
|
other_children, prod_seq,
|
|
)
|
|
])
|
|
for prod_seq in itertools.product(*or_children)
|
|
]
|
|
|
|
# Need to recursively continue to distribute AND/FOLLOWEDBY over OR
|
|
# in any of our new sub-expressions which need it.
|
|
distributed_children = [
|
|
self.transform(child)[0] for child in distributed_children
|
|
]
|
|
|
|
result = OrObservationExpression(distributed_children)
|
|
changed = True
|
|
|
|
else:
|
|
result = ast
|
|
|
|
return result, changed
|
|
|
|
def transform_and(self, ast):
|
|
return self.__transform(ast)
|
|
|
|
def transform_followedby(self, ast):
|
|
return self.__transform(ast)
|
|
|
|
|
|
class CanonicalizeComparisonExpressionsTransformer(
|
|
ObservationExpressionTransformer
|
|
):
|
|
"""
|
|
Canonicalize all comparison expressions.
|
|
"""
|
|
def __init__(self):
|
|
comp_flatten = CFlattenTransformer()
|
|
comp_order = COrderDedupeTransformer()
|
|
comp_absorb = CAbsorptionTransformer()
|
|
simplify = ChainTransformer(comp_flatten, comp_order, comp_absorb)
|
|
settle_simplify = SettleTransformer(simplify)
|
|
|
|
comp_special = SpecialValueCanonicalization()
|
|
comp_dnf = CDNFTransformer()
|
|
self.__comp_canonicalize = ChainTransformer(
|
|
comp_special, settle_simplify, comp_dnf, settle_simplify,
|
|
)
|
|
|
|
def transform_observation(self, ast):
|
|
comp_expr = ast.operand
|
|
canon_comp_expr, changed = self.__comp_canonicalize.transform(comp_expr)
|
|
ast.operand = canon_comp_expr
|
|
|
|
return ast, changed
|