From c0467da5f94cf0f0928e56d7dca6dbed75faa313 Mon Sep 17 00:00:00 2001 From: Richard Piazza Date: Tue, 27 Jun 2017 12:29:42 -0400 Subject: [PATCH] added classes for Pattern Expressions --- stix2/__init__.py | 10 ++ stix2/pattern_expressions.py | 187 +++++++++++++++++++++++++ stix2/test/test_pattern_expressions.py | 58 ++++++++ 3 files changed, 255 insertions(+) create mode 100644 stix2/pattern_expressions.py create mode 100644 stix2/test/test_pattern_expressions.py diff --git a/stix2/__init__.py b/stix2/__init__.py index 18c0b33..b48742f 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -20,6 +20,16 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, ExternalReference, GranularMarking, KillChainPhase, MarkingDefinition, StatementMarking, TLPMarking) +from .pattern_expressions import (AndBooleanExpression, + AndObservableExpression, + EqualityComparisonExpression, + FollowedByObservableExpression, + MatchesComparisonExpression, + ObservableExpression, OrBooleanExpression, + OrObservableExpression, + ParentheticalExpression, + QualifiedObservationExpression, + WithinQualifier) from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, Identity, Indicator, IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, Vulnerability) diff --git a/stix2/pattern_expressions.py b/stix2/pattern_expressions.py new file mode 100644 index 0000000..37c692c --- /dev/null +++ b/stix2/pattern_expressions.py @@ -0,0 +1,187 @@ +from six import text_type + + +class PatternExpression(object): + + @staticmethod + def get_root_from_object_path(lhs): + path_as_parts = lhs.split(":") + return path_as_parts[0] + + @staticmethod + def escape_quotes_and_backslashes(s): + return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") + + +class ComparisonExpression(PatternExpression): + def __init__(self, operator, lhs, rhs, negated=False): + if operator == "=" and isinstance(rhs, list): + self.operator = "IN" + else: + self.operator = operator + self.lhs = lhs + self.rhs = rhs + self.negated = negated + self.root_type = self.get_root_from_object_path(lhs) + + def __str__(self): + if isinstance(self.rhs, list): + final_rhs = [] + for r in self.rhs: + final_rhs.append("'" + self.escape_quotes_and_backslashes(text_type(r)) + "'") + rhs_string = "(" + ", ".join(final_rhs) + ")" + else: + rhs_string = "'" + self.escape_quotes_and_backslashes(text_type(self.rhs)) + "'" + return self.lhs + (" NOT" if self.negated else "") + " " + self.operator + " " + rhs_string + + +class EqualityComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated) + + +class GreaterThanComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated) + + +class LessThanComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated) + + +class GreaterThanEqualComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(GreaterThanComparisonExpression, self).__init__(">=", lhs, rhs, negated) + + +class LessThanEqualComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LessThanComparisonExpression, self).__init__("<=", lhs, rhs, negated) + + +class InComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated) + + +class LikeComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated) + + +class MatchesComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated) + + +# TODO: ISASUBSET, ISSUPERSET + + +class BooleanExpression(PatternExpression): + def __init__(self, operator, operands): + self.operator = operator + self.operands = [] + for arg in operands: + if not hasattr(self, "root_type"): + self.root_type = arg.root_type + elif self.root_type and (self.root_type != arg.root_type) and operator == "AND": + raise ValueError("This expression cannot have a mixed root type") + self.operands.append(arg) + + def __str__(self): + sub_exprs = [] + for o in self.operands: + sub_exprs.append(str(o)) + return (" " + self.operator + " ").join(sub_exprs) + + +class AndBooleanExpression(BooleanExpression): + def __init__(self, operands): + super(AndBooleanExpression, self).__init__("AND", operands) + + +class OrBooleanExpression(BooleanExpression): + def __init__(self, operands): + super(OrBooleanExpression, self).__init__("OR", operands) + + +class ObservableExpression(PatternExpression): + def __init__(self, operand): + self.operand = operand + + def __str__(self): + return "[" + str(self.operand) + "]" + + +class CompoundObservableExpression(PatternExpression): + def __init__(self, operator, operands): + self.operator = operator + self.operands = operands + + def __str__(self): + sub_exprs = [] + for o in self.operands: + sub_exprs.append(str(o)) + return (" " + self.operator + " ").join(sub_exprs) + + +class AndObservableExpression(CompoundObservableExpression): + def __init__(self, operands): + super(AndObservableExpression, self).__init__("AND", operands) + + +class OrObservableExpression(CompoundObservableExpression): + def __init__(self, operands): + super(OrObservableExpression, self).__init__("OR", operands) + + +class FollowedByObservableExpression(CompoundObservableExpression): + def __init__(self, operands): + super(FollowedByObservableExpression, self).__init__("FOLLOWEDBY", operands) + + +class ParentheticalExpression(PatternExpression): + def __init__(self, exp): + self.expression = exp + + def __str__(self): + return "(" + str(self.expression) + ")" + + +class ExpressionQualifier(PatternExpression): + pass + + +class RepeatQualifier(ExpressionQualifier): + def __init__(self, times_to_repeat): + self.times_to_repeat = times_to_repeat + + def __str__(self): + return "REPEATS %s TIMES" % str(self.times_to_repeat) + + +class WithinQualifier(ExpressionQualifier): + def __init__(self, number_of_seconds): + self.number_of_seconds = number_of_seconds + + def __str__(self): + return "WITHIN %s SECONDS" % (str(self.number_of_seconds)) + + +class StartStopQualifier(ExpressionQualifier): + def __init__(self, start_time, stop_time): + self.start_time = start_time + self.stop_time = stop_time + + def __str__(self): + return "START %s STOP %s" % (str(self.start_time), str(self.stop_time)) + + +class QualifiedObservationExpression(PatternExpression): + def __init__(self, observation_expression, qualifier): + self.observation_expression = observation_expression + self.qualifier = qualifier + + def __str__(self): + return str(self.observation_expression) + " " + str(self.qualifier) diff --git a/stix2/test/test_pattern_expressions.py b/stix2/test/test_pattern_expressions.py new file mode 100644 index 0000000..b86febb --- /dev/null +++ b/stix2/test/test_pattern_expressions.py @@ -0,0 +1,58 @@ +import stix2 + + +def test_create_comparison_expression(): + + exp = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f") + assert str(exp) == "file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f'" + + +def test_boolean_expression(): + exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", ".+\\@example\\.com$") + exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", "^Final Report.+\\.exe$") + exp = stix2.AndBooleanExpression([exp1, exp2]) + assert str(exp) == "email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$'" # noqa + + +def test_boolean_expression_with_parentheses(): + exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", ".+\\@example\\.com$") + exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", "^Final Report.+\\.exe$") + exp = stix2.ParentheticalExpression(stix2.AndBooleanExpression([exp1, exp2])) + assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')" # noqa + + +def test_hash_followed_by_registryKey_expression(): + hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", + "79054025255fb1a26e4bc422aef54eb4") + o_exp1 = stix2.ObservableExpression(hash_exp) + reg_exp = stix2.EqualityComparisonExpression("win-registry-key:key", + "HKEY_LOCAL_MACHINE\\foo\\bar") + o_exp2 = stix2.ObservableExpression(reg_exp) + fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) + para_exp = stix2.ParentheticalExpression(fb_exp) + qual_exp = stix2.WithinQualifier(300) + exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) + assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [win-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa + + +def test_file_observable_expression(): + exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f") + exp2 = stix2.EqualityComparisonExpression("file:mime_type", "application/x-pdf") + bool_exp = stix2.AndBooleanExpression([exp1, exp2]) + exp = stix2.ObservableExpression(bool_exp) + assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa + + +def test_multiple_file_observable_expression(): + exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + "bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c") + exp2 = stix2.EqualityComparisonExpression("file:hashes.MD5", + "cead3f77f6cda6ec00f57d76c9a6879f") + bool1_exp = stix2.OrBooleanExpression([exp1, exp2]) + exp3 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f") + op1_exp = stix2.ObservableExpression(bool1_exp) + op2_exp = stix2.ObservableExpression(exp3) + exp = stix2.AndObservableExpression([op1_exp, op2_exp]) + assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa