363 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			363 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
import importlib
 | 
						|
import inspect
 | 
						|
 | 
						|
from antlr4 import CommonTokenStream, InputStream
 | 
						|
import six
 | 
						|
from stix2patterns.grammars.STIXPatternLexer import STIXPatternLexer
 | 
						|
from stix2patterns.grammars.STIXPatternParser import (
 | 
						|
    STIXPatternParser, TerminalNode,
 | 
						|
)
 | 
						|
from stix2patterns.grammars.STIXPatternVisitor import STIXPatternVisitor
 | 
						|
from stix2patterns.validator import STIXPatternErrorListener
 | 
						|
 | 
						|
from .patterns import *
 | 
						|
from .patterns import _BooleanExpression
 | 
						|
 | 
						|
# flake8: noqa F405
 | 
						|
 | 
						|
 | 
						|
def collapse_lists(lists):
 | 
						|
    result = []
 | 
						|
    for c in lists:
 | 
						|
        if isinstance(c, list):
 | 
						|
            result.extend(c)
 | 
						|
        else:
 | 
						|
            result.append(c)
 | 
						|
    return result
 | 
						|
 | 
						|
 | 
						|
def remove_terminal_nodes(parse_tree_nodes):
 | 
						|
    values = []
 | 
						|
    for x in parse_tree_nodes:
 | 
						|
        if not isinstance(x, TerminalNode):
 | 
						|
            values.append(x)
 | 
						|
    return values
 | 
						|
 | 
						|
 | 
						|
# This class defines a complete generic visitor for a parse tree produced by STIXPatternParser.
 | 
						|
 | 
						|
 | 
						|
class STIXPatternVisitorForSTIX2(STIXPatternVisitor):
 | 
						|
    classes = {}
 | 
						|
 | 
						|
    def __init__(self, module_suffix, module_name):
 | 
						|
        if module_suffix and module_name:
 | 
						|
            self.module_suffix = module_suffix
 | 
						|
            if not STIXPatternVisitorForSTIX2.classes:
 | 
						|
                module = importlib.import_module(module_name)
 | 
						|
                for k, c in inspect.getmembers(module, inspect.isclass):
 | 
						|
                    STIXPatternVisitorForSTIX2.classes[k] = c
 | 
						|
        else:
 | 
						|
            self.module_suffix = None
 | 
						|
        super(STIXPatternVisitor, self).__init__()
 | 
						|
 | 
						|
    def get_class(self, class_name):
 | 
						|
        if class_name in STIXPatternVisitorForSTIX2.classes:
 | 
						|
            return STIXPatternVisitorForSTIX2.classes[class_name]
 | 
						|
        else:
 | 
						|
            return None
 | 
						|
 | 
						|
    def instantiate(self, klass_name, *args):
 | 
						|
        klass_to_instantiate = None
 | 
						|
        if self.module_suffix:
 | 
						|
            klass_to_instantiate = self.get_class(klass_name + "For" + self.module_suffix)
 | 
						|
        if not klass_to_instantiate:
 | 
						|
            # use the classes in python_stix2
 | 
						|
            klass_to_instantiate = globals()[klass_name]
 | 
						|
        return klass_to_instantiate(*args)
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#pattern.
 | 
						|
    def visitPattern(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return children[0]
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#observationExpressions.
 | 
						|
    def visitObservationExpressions(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        if len(children) == 1:
 | 
						|
            return children[0]
 | 
						|
        else:
 | 
						|
            return FollowedByObservationExpression([children[0], children[2]])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#observationExpressionOr.
 | 
						|
    def visitObservationExpressionOr(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        if len(children) == 1:
 | 
						|
            return children[0]
 | 
						|
        else:
 | 
						|
            return self.instantiate("OrObservationExpression", [children[0], children[2]])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#observationExpressionAnd.
 | 
						|
    def visitObservationExpressionAnd(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        if len(children) == 1:
 | 
						|
            return children[0]
 | 
						|
        else:
 | 
						|
            return self.instantiate("AndObservationExpression", [children[0], children[2]])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#observationExpressionRepeated.
 | 
						|
    def visitObservationExpressionRepeated(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("QualifiedObservationExpression", children[0], children[1])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#observationExpressionSimple.
 | 
						|
    def visitObservationExpressionSimple(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("ObservationExpression", children[1])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#observationExpressionCompound.
 | 
						|
    def visitObservationExpressionCompound(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("ObservationExpression", children[1])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#observationExpressionWithin.
 | 
						|
    def visitObservationExpressionWithin(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("QualifiedObservationExpression", children[0], children[1])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#observationExpressionStartStop.
 | 
						|
    def visitObservationExpressionStartStop(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("QualifiedObservationExpression", children[0], children[1])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#comparisonExpression.
 | 
						|
    def visitComparisonExpression(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        if len(children) == 1:
 | 
						|
            return children[0]
 | 
						|
        else:
 | 
						|
            if isinstance(children[0], _BooleanExpression):
 | 
						|
                children[0].operands.append(children[2])
 | 
						|
                return children[0]
 | 
						|
            else:
 | 
						|
                return self.instantiate("OrBooleanExpression", [children[0], children[2]])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#comparisonExpressionAnd.
 | 
						|
    def visitComparisonExpressionAnd(self, ctx):
 | 
						|
        # TODO: NOT
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        if len(children) == 1:
 | 
						|
            return children[0]
 | 
						|
        else:
 | 
						|
            if isinstance(children[0], _BooleanExpression):
 | 
						|
                children[0].operands.append(children[2])
 | 
						|
                return children[0]
 | 
						|
            else:
 | 
						|
                return self.instantiate("AndBooleanExpression", [children[0], children[2]])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#propTestEqual.
 | 
						|
    def visitPropTestEqual(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        operator = children[1].symbol.type
 | 
						|
        negated = operator != STIXPatternParser.EQ
 | 
						|
        return self.instantiate(
 | 
						|
            "EqualityComparisonExpression", children[0], children[3 if len(children) > 3 else 2],
 | 
						|
            negated,
 | 
						|
        )
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#propTestOrder.
 | 
						|
    def visitPropTestOrder(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        operator = children[1].symbol.type
 | 
						|
        if operator == STIXPatternParser.GT:
 | 
						|
            return self.instantiate(
 | 
						|
                "GreaterThanComparisonExpression", children[0],
 | 
						|
                children[3 if len(children) > 3 else 2], False,
 | 
						|
            )
 | 
						|
        elif operator == STIXPatternParser.LT:
 | 
						|
            return self.instantiate(
 | 
						|
                "LessThanComparisonExpression", children[0],
 | 
						|
                children[3 if len(children) > 3 else 2], False,
 | 
						|
            )
 | 
						|
        elif operator == STIXPatternParser.GE:
 | 
						|
            return self.instantiate(
 | 
						|
                "GreaterThanEqualComparisonExpression", children[0],
 | 
						|
                children[3 if len(children) > 3 else 2], False,
 | 
						|
            )
 | 
						|
        elif operator == STIXPatternParser.LE:
 | 
						|
            return self.instantiate(
 | 
						|
                "LessThanEqualComparisonExpression", children[0],
 | 
						|
                children[3 if len(children) > 3 else 2], False,
 | 
						|
            )
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#propTestSet.
 | 
						|
    def visitPropTestSet(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("InComparisonExpression", children[0], children[3 if len(children) > 3 else 2], False)
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#propTestLike.
 | 
						|
    def visitPropTestLike(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("LikeComparisonExpression", children[0], children[3 if len(children) > 3 else 2], False)
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#propTestRegex.
 | 
						|
    def visitPropTestRegex(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate(
 | 
						|
            "MatchesComparisonExpression", children[0], children[3 if len(children) > 3 else 2],
 | 
						|
            False,
 | 
						|
        )
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#propTestIsSubset.
 | 
						|
    def visitPropTestIsSubset(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("IsSubsetComparisonExpression", children[0], children[3 if len(children) > 3 else 2])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#propTestIsSuperset.
 | 
						|
    def visitPropTestIsSuperset(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("IsSupersetComparisonExpression", children[0], children[3 if len(children) > 3 else 2])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#propTestParen.
 | 
						|
    def visitPropTestParen(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("ParentheticalExpression", children[1])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#startStopQualifier.
 | 
						|
    def visitStartStopQualifier(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return StartStopQualifier(children[1], children[3])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#withinQualifier.
 | 
						|
    def visitWithinQualifier(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return WithinQualifier(children[1])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#repeatedQualifier.
 | 
						|
    def visitRepeatedQualifier(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return RepeatQualifier(children[1])
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#objectPath.
 | 
						|
    def visitObjectPath(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        flat_list = collapse_lists(children[2:])
 | 
						|
        property_path = []
 | 
						|
        i = 0
 | 
						|
        while i < len(flat_list):
 | 
						|
            current = flat_list[i]
 | 
						|
            if i == len(flat_list)-1:
 | 
						|
                property_path.append(current)
 | 
						|
                break
 | 
						|
            next = flat_list[i+1]
 | 
						|
            if isinstance(next, TerminalNode):
 | 
						|
                property_path.append(self.instantiate("ListObjectPathComponent", current.property_name, next.getText()))
 | 
						|
                i += 2
 | 
						|
            else:
 | 
						|
                property_path.append(current)
 | 
						|
                i += 1
 | 
						|
        return self.instantiate("ObjectPath", children[0].getText(), property_path)
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#objectType.
 | 
						|
    def visitObjectType(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return children[0]
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#firstPathComponent.
 | 
						|
    def visitFirstPathComponent(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        step = children[0].getText()
 | 
						|
        # if step.endswith("_ref"):
 | 
						|
        #     return stix2.ReferenceObjectPathComponent(step)
 | 
						|
        # else:
 | 
						|
        return self.instantiate("BasicObjectPathComponent", step, False)
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#indexPathStep.
 | 
						|
    def visitIndexPathStep(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return children[1]
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#pathStep.
 | 
						|
    def visitPathStep(self, ctx):
 | 
						|
        return collapse_lists(self.visitChildren(ctx))
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#keyPathStep.
 | 
						|
    def visitKeyPathStep(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        if isinstance(children[1], StringConstant):
 | 
						|
            # special case for hashes
 | 
						|
            return children[1].value
 | 
						|
        else:
 | 
						|
            return self.instantiate("BasicObjectPathComponent", children[1].getText(), True)
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#setLiteral.
 | 
						|
    def visitSetLiteral(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return self.instantiate("ListConstant", remove_terminal_nodes(children))
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#primitiveLiteral.
 | 
						|
    def visitPrimitiveLiteral(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return children[0]
 | 
						|
 | 
						|
    # Visit a parse tree produced by STIXPatternParser#orderableLiteral.
 | 
						|
    def visitOrderableLiteral(self, ctx):
 | 
						|
        children = self.visitChildren(ctx)
 | 
						|
        return children[0]
 | 
						|
 | 
						|
    def visitTerminal(self, node):
 | 
						|
        if node.symbol.type == STIXPatternParser.IntPosLiteral or node.symbol.type == STIXPatternParser.IntNegLiteral:
 | 
						|
            return IntegerConstant(node.getText())
 | 
						|
        elif node.symbol.type == STIXPatternParser.FloatPosLiteral or node.symbol.type == STIXPatternParser.FloatNegLiteral:
 | 
						|
            return FloatConstant(node.getText())
 | 
						|
        elif node.symbol.type == STIXPatternParser.HexLiteral:
 | 
						|
            return HexConstant(node.getText(), from_parse_tree=True)
 | 
						|
        elif node.symbol.type == STIXPatternParser.BinaryLiteral:
 | 
						|
            return BinaryConstant(node.getText(), from_parse_tree=True)
 | 
						|
        elif node.symbol.type == STIXPatternParser.StringLiteral:
 | 
						|
            return StringConstant(node.getText().strip('\''), from_parse_tree=True)
 | 
						|
        elif node.symbol.type == STIXPatternParser.BoolLiteral:
 | 
						|
            return BooleanConstant(node.getText())
 | 
						|
        elif node.symbol.type == STIXPatternParser.TimestampLiteral:
 | 
						|
            return TimestampConstant(node.getText())
 | 
						|
        else:
 | 
						|
            return node
 | 
						|
 | 
						|
    def aggregateResult(self, aggregate, nextResult):
 | 
						|
        if aggregate:
 | 
						|
            aggregate.append(nextResult)
 | 
						|
        elif nextResult:
 | 
						|
            aggregate = [nextResult]
 | 
						|
        return aggregate
 | 
						|
 | 
						|
 | 
						|
def create_pattern_object(pattern, module_suffix="", module_name=""):
 | 
						|
    """
 | 
						|
    Validates a pattern against the STIX Pattern grammar.  Error messages are
 | 
						|
    returned in a list.  The test passed if the returned list is empty.
 | 
						|
    """
 | 
						|
 | 
						|
    start = ''
 | 
						|
    if isinstance(pattern, six.string_types):
 | 
						|
        start = pattern[:2]
 | 
						|
        pattern = InputStream(pattern)
 | 
						|
 | 
						|
    if not start:
 | 
						|
        start = pattern.readline()[:2]
 | 
						|
        pattern.seek(0)
 | 
						|
 | 
						|
    parseErrListener = STIXPatternErrorListener()
 | 
						|
 | 
						|
    lexer = STIXPatternLexer(pattern)
 | 
						|
    # it always adds a console listener by default... remove it.
 | 
						|
    lexer.removeErrorListeners()
 | 
						|
 | 
						|
    stream = CommonTokenStream(lexer)
 | 
						|
 | 
						|
    parser = STIXPatternParser(stream)
 | 
						|
    parser.buildParseTrees = True
 | 
						|
    # it always adds a console listener by default... remove it.
 | 
						|
    parser.removeErrorListeners()
 | 
						|
    parser.addErrorListener(parseErrListener)
 | 
						|
 | 
						|
    # To improve error messages, replace "<INVALID>" in the literal
 | 
						|
    # names with symbolic names.  This is a hack, but seemed like
 | 
						|
    # the simplest workaround.
 | 
						|
    for i, lit_name in enumerate(parser.literalNames):
 | 
						|
        if lit_name == u"<INVALID>":
 | 
						|
            parser.literalNames[i] = parser.symbolicNames[i]
 | 
						|
 | 
						|
    tree = parser.pattern()
 | 
						|
    builder = STIXPatternVisitorForSTIX2(module_suffix, module_name)
 | 
						|
    return builder.visit(tree)
 |