From c0467da5f94cf0f0928e56d7dca6dbed75faa313 Mon Sep 17 00:00:00 2001 From: Richard Piazza Date: Tue, 27 Jun 2017 12:29:42 -0400 Subject: [PATCH 1/7] added classes for Pattern Expressions --- stix2/__init__.py | 10 ++ stix2/pattern_expressions.py | 187 +++++++++++++++++++++++++ stix2/test/test_pattern_expressions.py | 58 ++++++++ 3 files changed, 255 insertions(+) create mode 100644 stix2/pattern_expressions.py create mode 100644 stix2/test/test_pattern_expressions.py diff --git a/stix2/__init__.py b/stix2/__init__.py index 18c0b33..b48742f 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -20,6 +20,16 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, ExternalReference, GranularMarking, KillChainPhase, MarkingDefinition, StatementMarking, TLPMarking) +from .pattern_expressions import (AndBooleanExpression, + AndObservableExpression, + EqualityComparisonExpression, + FollowedByObservableExpression, + MatchesComparisonExpression, + ObservableExpression, OrBooleanExpression, + OrObservableExpression, + ParentheticalExpression, + QualifiedObservationExpression, + WithinQualifier) from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, Identity, Indicator, IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, Vulnerability) diff --git a/stix2/pattern_expressions.py b/stix2/pattern_expressions.py new file mode 100644 index 0000000..37c692c --- /dev/null +++ b/stix2/pattern_expressions.py @@ -0,0 +1,187 @@ +from six import text_type + + +class PatternExpression(object): + + @staticmethod + def get_root_from_object_path(lhs): + path_as_parts = lhs.split(":") + return path_as_parts[0] + + @staticmethod + def escape_quotes_and_backslashes(s): + return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") + + +class ComparisonExpression(PatternExpression): + def __init__(self, operator, lhs, rhs, negated=False): + if operator == "=" and isinstance(rhs, list): + self.operator = "IN" + else: + self.operator = operator + self.lhs = lhs + self.rhs = rhs + self.negated = negated + self.root_type = self.get_root_from_object_path(lhs) + + def __str__(self): + if isinstance(self.rhs, list): + final_rhs = [] + for r in self.rhs: + final_rhs.append("'" + self.escape_quotes_and_backslashes(text_type(r)) + "'") + rhs_string = "(" + ", ".join(final_rhs) + ")" + else: + rhs_string = "'" + self.escape_quotes_and_backslashes(text_type(self.rhs)) + "'" + return self.lhs + (" NOT" if self.negated else "") + " " + self.operator + " " + rhs_string + + +class EqualityComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated) + + +class GreaterThanComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated) + + +class LessThanComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated) + + +class GreaterThanEqualComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(GreaterThanComparisonExpression, self).__init__(">=", lhs, rhs, negated) + + +class LessThanEqualComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LessThanComparisonExpression, self).__init__("<=", lhs, rhs, negated) + + +class InComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated) + + +class LikeComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated) + + +class MatchesComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated) + + +# TODO: ISASUBSET, ISSUPERSET + + +class BooleanExpression(PatternExpression): + def __init__(self, operator, operands): + self.operator = operator + self.operands = [] + for arg in operands: + if not hasattr(self, "root_type"): + self.root_type = arg.root_type + elif self.root_type and (self.root_type != arg.root_type) and operator == "AND": + raise ValueError("This expression cannot have a mixed root type") + self.operands.append(arg) + + def __str__(self): + sub_exprs = [] + for o in self.operands: + sub_exprs.append(str(o)) + return (" " + self.operator + " ").join(sub_exprs) + + +class AndBooleanExpression(BooleanExpression): + def __init__(self, operands): + super(AndBooleanExpression, self).__init__("AND", operands) + + +class OrBooleanExpression(BooleanExpression): + def __init__(self, operands): + super(OrBooleanExpression, self).__init__("OR", operands) + + +class ObservableExpression(PatternExpression): + def __init__(self, operand): + self.operand = operand + + def __str__(self): + return "[" + str(self.operand) + "]" + + +class CompoundObservableExpression(PatternExpression): + def __init__(self, operator, operands): + self.operator = operator + self.operands = operands + + def __str__(self): + sub_exprs = [] + for o in self.operands: + sub_exprs.append(str(o)) + return (" " + self.operator + " ").join(sub_exprs) + + +class AndObservableExpression(CompoundObservableExpression): + def __init__(self, operands): + super(AndObservableExpression, self).__init__("AND", operands) + + +class OrObservableExpression(CompoundObservableExpression): + def __init__(self, operands): + super(OrObservableExpression, self).__init__("OR", operands) + + +class FollowedByObservableExpression(CompoundObservableExpression): + def __init__(self, operands): + super(FollowedByObservableExpression, self).__init__("FOLLOWEDBY", operands) + + +class ParentheticalExpression(PatternExpression): + def __init__(self, exp): + self.expression = exp + + def __str__(self): + return "(" + str(self.expression) + ")" + + +class ExpressionQualifier(PatternExpression): + pass + + +class RepeatQualifier(ExpressionQualifier): + def __init__(self, times_to_repeat): + self.times_to_repeat = times_to_repeat + + def __str__(self): + return "REPEATS %s TIMES" % str(self.times_to_repeat) + + +class WithinQualifier(ExpressionQualifier): + def __init__(self, number_of_seconds): + self.number_of_seconds = number_of_seconds + + def __str__(self): + return "WITHIN %s SECONDS" % (str(self.number_of_seconds)) + + +class StartStopQualifier(ExpressionQualifier): + def __init__(self, start_time, stop_time): + self.start_time = start_time + self.stop_time = stop_time + + def __str__(self): + return "START %s STOP %s" % (str(self.start_time), str(self.stop_time)) + + +class QualifiedObservationExpression(PatternExpression): + def __init__(self, observation_expression, qualifier): + self.observation_expression = observation_expression + self.qualifier = qualifier + + def __str__(self): + return str(self.observation_expression) + " " + str(self.qualifier) diff --git a/stix2/test/test_pattern_expressions.py b/stix2/test/test_pattern_expressions.py new file mode 100644 index 0000000..b86febb --- /dev/null +++ b/stix2/test/test_pattern_expressions.py @@ -0,0 +1,58 @@ +import stix2 + + +def test_create_comparison_expression(): + + exp = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f") + assert str(exp) == "file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f'" + + +def test_boolean_expression(): + exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", ".+\\@example\\.com$") + exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", "^Final Report.+\\.exe$") + exp = stix2.AndBooleanExpression([exp1, exp2]) + assert str(exp) == "email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$'" # noqa + + +def test_boolean_expression_with_parentheses(): + exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", ".+\\@example\\.com$") + exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", "^Final Report.+\\.exe$") + exp = stix2.ParentheticalExpression(stix2.AndBooleanExpression([exp1, exp2])) + assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')" # noqa + + +def test_hash_followed_by_registryKey_expression(): + hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", + "79054025255fb1a26e4bc422aef54eb4") + o_exp1 = stix2.ObservableExpression(hash_exp) + reg_exp = stix2.EqualityComparisonExpression("win-registry-key:key", + "HKEY_LOCAL_MACHINE\\foo\\bar") + o_exp2 = stix2.ObservableExpression(reg_exp) + fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) + para_exp = stix2.ParentheticalExpression(fb_exp) + qual_exp = stix2.WithinQualifier(300) + exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) + assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [win-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa + + +def test_file_observable_expression(): + exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f") + exp2 = stix2.EqualityComparisonExpression("file:mime_type", "application/x-pdf") + bool_exp = stix2.AndBooleanExpression([exp1, exp2]) + exp = stix2.ObservableExpression(bool_exp) + assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa + + +def test_multiple_file_observable_expression(): + exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + "bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c") + exp2 = stix2.EqualityComparisonExpression("file:hashes.MD5", + "cead3f77f6cda6ec00f57d76c9a6879f") + bool1_exp = stix2.OrBooleanExpression([exp1, exp2]) + exp3 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f") + op1_exp = stix2.ObservableExpression(bool1_exp) + op2_exp = stix2.ObservableExpression(exp3) + exp = stix2.AndObservableExpression([op1_exp, op2_exp]) + assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa From c8bcece6f6b693b1cfd67113caac8eaede618201 Mon Sep 17 00:00:00 2001 From: Richard Piazza Date: Thu, 6 Jul 2017 10:06:24 -0400 Subject: [PATCH 2/7] added tests for expressions fix __str__ methods --- stix2/__init__.py | 4 +++- stix2/pattern_expressions.py | 26 ++++++++++++++------------ stix2/test/test_pattern_expressions.py | 11 +++++++++++ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/stix2/__init__.py b/stix2/__init__.py index b48742f..1974118 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -22,10 +22,12 @@ from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, MarkingDefinition, StatementMarking, TLPMarking) from .pattern_expressions import (AndBooleanExpression, AndObservableExpression, + ComparisonExpression, EqualityComparisonExpression, FollowedByObservableExpression, MatchesComparisonExpression, - ObservableExpression, OrBooleanExpression, + ObservableExpression, + OrBooleanExpression, OrObservableExpression, ParentheticalExpression, QualifiedObservationExpression, diff --git a/stix2/pattern_expressions.py b/stix2/pattern_expressions.py index 37c692c..e952db9 100644 --- a/stix2/pattern_expressions.py +++ b/stix2/pattern_expressions.py @@ -1,5 +1,3 @@ -from six import text_type - class PatternExpression(object): @@ -28,10 +26,10 @@ class ComparisonExpression(PatternExpression): if isinstance(self.rhs, list): final_rhs = [] for r in self.rhs: - final_rhs.append("'" + self.escape_quotes_and_backslashes(text_type(r)) + "'") + final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'") rhs_string = "(" + ", ".join(final_rhs) + ")" else: - rhs_string = "'" + self.escape_quotes_and_backslashes(text_type(self.rhs)) + "'" + rhs_string = "'" + self.escape_quotes_and_backslashes("%s" % self.rhs) + "'" return self.lhs + (" NOT" if self.negated else "") + " " + self.operator + " " + rhs_string @@ -87,12 +85,14 @@ class BooleanExpression(PatternExpression): self.root_type = arg.root_type elif self.root_type and (self.root_type != arg.root_type) and operator == "AND": raise ValueError("This expression cannot have a mixed root type") + elif self.root_type and (self.root_type != arg.root_type): + self.root_type = None self.operands.append(arg) def __str__(self): sub_exprs = [] for o in self.operands: - sub_exprs.append(str(o)) + sub_exprs.append("%s" % o) return (" " + self.operator + " ").join(sub_exprs) @@ -111,7 +111,7 @@ class ObservableExpression(PatternExpression): self.operand = operand def __str__(self): - return "[" + str(self.operand) + "]" + return "[%s]" % self.operand class CompoundObservableExpression(PatternExpression): @@ -122,7 +122,7 @@ class CompoundObservableExpression(PatternExpression): def __str__(self): sub_exprs = [] for o in self.operands: - sub_exprs.append(str(o)) + sub_exprs.append("%s" % o) return (" " + self.operator + " ").join(sub_exprs) @@ -144,9 +144,11 @@ class FollowedByObservableExpression(CompoundObservableExpression): class ParentheticalExpression(PatternExpression): def __init__(self, exp): self.expression = exp + if hasattr(exp, "root_type"): + self.root_type = exp.root_type def __str__(self): - return "(" + str(self.expression) + ")" + return "(%s)" % self.expression class ExpressionQualifier(PatternExpression): @@ -158,7 +160,7 @@ class RepeatQualifier(ExpressionQualifier): self.times_to_repeat = times_to_repeat def __str__(self): - return "REPEATS %s TIMES" % str(self.times_to_repeat) + return "REPEATS %s TIMES" % self.times_to_repeat class WithinQualifier(ExpressionQualifier): @@ -166,7 +168,7 @@ class WithinQualifier(ExpressionQualifier): self.number_of_seconds = number_of_seconds def __str__(self): - return "WITHIN %s SECONDS" % (str(self.number_of_seconds)) + return "WITHIN %s SECONDS" % self.number_of_seconds class StartStopQualifier(ExpressionQualifier): @@ -175,7 +177,7 @@ class StartStopQualifier(ExpressionQualifier): self.stop_time = stop_time def __str__(self): - return "START %s STOP %s" % (str(self.start_time), str(self.stop_time)) + return "START %s STOP %s" % (self.start_time, self.stop_time) class QualifiedObservationExpression(PatternExpression): @@ -184,4 +186,4 @@ class QualifiedObservationExpression(PatternExpression): self.qualifier = qualifier def __str__(self): - return str(self.observation_expression) + " " + str(self.qualifier) + return "%s %s" % (self.observation_expression, self.qualifier) diff --git a/stix2/test/test_pattern_expressions.py b/stix2/test/test_pattern_expressions.py index b86febb..49338df 100644 --- a/stix2/test/test_pattern_expressions.py +++ b/stix2/test/test_pattern_expressions.py @@ -56,3 +56,14 @@ def test_multiple_file_observable_expression(): op2_exp = stix2.ObservableExpression(exp3) exp = stix2.AndObservableExpression([op1_exp, op2_exp]) assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa + + +def test_root_types(): + ast = stix2.ObservableExpression( + stix2.AndBooleanExpression( + [stix2.ParentheticalExpression( + stix2.OrBooleanExpression([ + stix2.EqualityComparisonExpression(u"a:b", u"1"), + stix2.EqualityComparisonExpression(u"b:c", u"2")])), + stix2.EqualityComparisonExpression(u"b:d", u"3")])) + assert str(ast) == "[(a:b = '1' OR b:c = '2') AND b:d = '3']" From c1b07ef505b1d3644dce5e513aee46e1183ea9eb Mon Sep 17 00:00:00 2001 From: Richard Piazza Date: Fri, 7 Jul 2017 16:27:39 -0400 Subject: [PATCH 3/7] Introduce constant objects for literals in pattern expressions fixed idioms --- stix2/__init__.py | 11 ++- stix2/constants.py | 125 +++++++++++++++++++++++++ stix2/pattern_expressions.py | 26 +++-- stix2/test/test_pattern_expressions.py | 60 ++++++++---- 4 files changed, 195 insertions(+), 27 deletions(-) create mode 100644 stix2/constants.py diff --git a/stix2/__init__.py b/stix2/__init__.py index 1974118..a1eaa56 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -4,6 +4,8 @@ from . import exceptions from .bundle import Bundle +from .constants import (FloatConstant, HashConstant, IntegerConstant, + StringConstant) from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, CustomObservable, Directory, DomainName, EmailAddress, EmailMessage, @@ -25,12 +27,17 @@ from .pattern_expressions import (AndBooleanExpression, ComparisonExpression, EqualityComparisonExpression, FollowedByObservableExpression, + GreaterThanComparisonExpression, + GreaterThanEqualComparisonExpression, + LessThanComparisonExpression, + LessThanEqualComparisonExpression, + LikeComparisonExpression, MatchesComparisonExpression, - ObservableExpression, - OrBooleanExpression, + ObservableExpression, OrBooleanExpression, OrObservableExpression, ParentheticalExpression, QualifiedObservationExpression, + RepeatQualifier, StartStopQualifier, WithinQualifier) from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, Identity, Indicator, IntrusionSet, Malware, ObservedData, diff --git a/stix2/constants.py b/stix2/constants.py new file mode 100644 index 0000000..fb84c19 --- /dev/null +++ b/stix2/constants.py @@ -0,0 +1,125 @@ +import base64 +import binascii +import re + +# TODO: REConstant? +# TODO: Timestamp + + +class Constant(object): + def __str__(self): + return "%s" % self.value + + @staticmethod + def escape_quotes_and_backslashes(s): + return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") + + +class StringConstant(Constant): + def __init__(self, value): + self.value = value + + def __str__(self): + return "'%s'" % StringConstant.escape_quotes_and_backslashes(self.value) + + +class IntegerConstant(Constant): + def __init__(self, value): + try: + self.value = int(value) + except Exception: + raise ValueError("must be an integer.") + + +class FloatConstant(Constant): + def __init__(self, value): + try: + self.value = float(value) + except Exception: + raise ValueError("must be an float.") + + +class BooleanConstant(Constant): + def __init__(self, value): + if isinstance(value, bool): + self.value = value + + trues = ['true', 't'] + falses = ['false', 'f'] + try: + if value.lower() in trues: + self.value = True + if value.lower() in falses: + self.value = False + except AttributeError: + if value == 1: + self.value = True + if value == 0: + self.value = False + + raise ValueError("must be a boolean value.") + + +_HASH_REGEX = { + "MD5": ("^[a-fA-F0-9]{32}$", "MD5"), + "MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"), + "RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"), + "SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"), + "SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"), + "SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"), + "SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"), + "SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"), + "SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"), + "SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"), + "SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"), + "SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"), + "SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"), + "WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"), +} + + +class HashConstant(StringConstant): + def __init__(self, value, type): + key = type.upper().replace('-', '') + if key in _HASH_REGEX: + vocab_key = _HASH_REGEX[key][1] + if not re.match(_HASH_REGEX[key][0], value): + raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key)) + self.value = value + + +class BinaryConstant(Constant): + + def __init__(self, value): + try: + base64.b64decode(value) + self.value = value + except (binascii.Error, TypeError): + raise ValueError("must contain a base64 encoded string") + + def __str__(self): + return "b'%s'" % self.value + + +class HexConstant(Constant): + + def __init__(self, value): + if not re.match('^([a-fA-F0-9]{2})+$', value): + raise ValueError("must contain an even number of hexadecimal characters") + self.value = value + + def __str__(self): + return "h'%s'" % self.value + + +class ListConstant(Constant): + def __init__(self, values): + self.value = values + + def __str__(self): + return "(" + ", ".join([("%s" % x) for x in self.value]) + ")" + + +def make_constant(value): + # TODO: Stub + pass diff --git a/stix2/pattern_expressions.py b/stix2/pattern_expressions.py index e952db9..12aadc2 100644 --- a/stix2/pattern_expressions.py +++ b/stix2/pattern_expressions.py @@ -1,3 +1,5 @@ +from .constants import ListConstant, make_constant + class PatternExpression(object): @@ -13,24 +15,30 @@ class PatternExpression(object): class ComparisonExpression(PatternExpression): def __init__(self, operator, lhs, rhs, negated=False): - if operator == "=" and isinstance(rhs, list): + if operator == "=" and isinstance(rhs, ListConstant): self.operator = "IN" else: self.operator = operator self.lhs = lhs - self.rhs = rhs + if isinstance(rhs, str): + self.rhs = make_constant(rhs) + else: + self.rhs = rhs self.negated = negated self.root_type = self.get_root_from_object_path(lhs) def __str__(self): - if isinstance(self.rhs, list): - final_rhs = [] - for r in self.rhs: - final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'") - rhs_string = "(" + ", ".join(final_rhs) + ")" + # if isinstance(self.rhs, list): + # final_rhs = [] + # for r in self.rhs: + # final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'") + # rhs_string = "(" + ", ".join(final_rhs) + ")" + # else: + # rhs_string = self.rhs + if self.negated: + return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs) else: - rhs_string = "'" + self.escape_quotes_and_backslashes("%s" % self.rhs) + "'" - return self.lhs + (" NOT" if self.negated else "") + " " + self.operator + " " + rhs_string + return "%s %s %s" % (self.lhs, self.operator, self.rhs) class EqualityComparisonExpression(ComparisonExpression): diff --git a/stix2/test/test_pattern_expressions.py b/stix2/test/test_pattern_expressions.py index 49338df..fdfb1ee 100644 --- a/stix2/test/test_pattern_expressions.py +++ b/stix2/test/test_pattern_expressions.py @@ -3,42 +3,49 @@ import stix2 def test_create_comparison_expression(): - exp = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f") + exp = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + stix2.HashConstant("aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", "SHA-256")) # noqa assert str(exp) == "file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f'" def test_boolean_expression(): - exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", ".+\\@example\\.com$") - exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", "^Final Report.+\\.exe$") + exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", + stix2.StringConstant(".+\\@example\\.com$")) + exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", + stix2.StringConstant("^Final Report.+\\.exe$")) exp = stix2.AndBooleanExpression([exp1, exp2]) assert str(exp) == "email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$'" # noqa def test_boolean_expression_with_parentheses(): - exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", ".+\\@example\\.com$") - exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", "^Final Report.+\\.exe$") + exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", + stix2.StringConstant(".+\\@example\\.com$")) + exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", + stix2.StringConstant("^Final Report.+\\.exe$")) exp = stix2.ParentheticalExpression(stix2.AndBooleanExpression([exp1, exp2])) assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')" # noqa def test_hash_followed_by_registryKey_expression(): hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", - "79054025255fb1a26e4bc422aef54eb4") + stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) o_exp1 = stix2.ObservableExpression(hash_exp) reg_exp = stix2.EqualityComparisonExpression("win-registry-key:key", - "HKEY_LOCAL_MACHINE\\foo\\bar") + stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) o_exp2 = stix2.ObservableExpression(reg_exp) fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) para_exp = stix2.ParentheticalExpression(fb_exp) - qual_exp = stix2.WithinQualifier(300) + qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300)) exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [win-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa def test_file_observable_expression(): exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", - "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f") - exp2 = stix2.EqualityComparisonExpression("file:mime_type", "application/x-pdf") + stix2.HashConstant( + "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", + 'SHA-256')) + exp2 = stix2.EqualityComparisonExpression("file:mime_type", stix2.StringConstant("application/x-pdf")) bool_exp = stix2.AndBooleanExpression([exp1, exp2]) exp = stix2.ObservableExpression(bool_exp) assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa @@ -46,12 +53,16 @@ def test_file_observable_expression(): def test_multiple_file_observable_expression(): exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", - "bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c") + stix2.HashConstant( + "bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c", + 'SHA-256')) exp2 = stix2.EqualityComparisonExpression("file:hashes.MD5", - "cead3f77f6cda6ec00f57d76c9a6879f") + stix2.HashConstant("cead3f77f6cda6ec00f57d76c9a6879f", "MD5")) bool1_exp = stix2.OrBooleanExpression([exp1, exp2]) exp3 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", - "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f") + stix2.HashConstant( + "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", + 'SHA-256')) op1_exp = stix2.ObservableExpression(bool1_exp) op2_exp = stix2.ObservableExpression(exp3) exp = stix2.AndObservableExpression([op1_exp, op2_exp]) @@ -63,7 +74,24 @@ def test_root_types(): stix2.AndBooleanExpression( [stix2.ParentheticalExpression( stix2.OrBooleanExpression([ - stix2.EqualityComparisonExpression(u"a:b", u"1"), - stix2.EqualityComparisonExpression(u"b:c", u"2")])), - stix2.EqualityComparisonExpression(u"b:d", u"3")])) + stix2.EqualityComparisonExpression("a:b", stix2.StringConstant("1")), + stix2.EqualityComparisonExpression("b:c", stix2.StringConstant("2"))])), + stix2.EqualityComparisonExpression(u"b:d", stix2.StringConstant("3"))])) assert str(ast) == "[(a:b = '1' OR b:c = '2') AND b:d = '3']" + + +def test_artifact_payload(): + exp1 = stix2.EqualityComparisonExpression("artifact:mime_type", + stix2.StringConstant("application/vnd.tcpdump.pcap")) + exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin", + stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00")) + and_exp = stix2.AndBooleanExpression([exp1, exp2]) + exp = stix2.ObservableExpression(and_exp) + assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa + + +def test_greater_than(): + exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", + stix2.FloatConstant(7.0)) + exp = stix2.ObservableExpression(exp1) + assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" From 6fa009e5094393be296ff1cfeecfa907e381ef0d Mon Sep 17 00:00:00 2001 From: Richard Piazza Date: Wed, 12 Jul 2017 17:02:51 -0400 Subject: [PATCH 4/7] added object_paths added more tests for pattern expressions added "set" comparison expressions implemented make_constant fixed type name for EmailAddress --- stix2/__init__.py | 7 ++- stix2/constants.py | 14 ++++- stix2/object_path.py | 59 +++++++++++++++++++ stix2/observables.py | 4 +- stix2/pattern_expressions.py | 56 +++++++++++++----- stix2/test/test_observed_data.py | 4 +- stix2/test/test_pattern_expressions.py | 79 +++++++++++++++++++++++++- 7 files changed, 197 insertions(+), 26 deletions(-) create mode 100644 stix2/object_path.py diff --git a/stix2/__init__.py b/stix2/__init__.py index a1eaa56..637672a 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -4,8 +4,9 @@ from . import exceptions from .bundle import Bundle -from .constants import (FloatConstant, HashConstant, IntegerConstant, - StringConstant) +from .constants import (FloatConstant, HashConstant, HexConstant, + IntegerConstant, StringConstant) +from .object_path import ObjectPath, ObjectPathComponent from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, CustomObservable, Directory, DomainName, EmailAddress, EmailMessage, @@ -29,6 +30,8 @@ from .pattern_expressions import (AndBooleanExpression, FollowedByObservableExpression, GreaterThanComparisonExpression, GreaterThanEqualComparisonExpression, + IsSubsetComparisonExpression, + IsSupersetComparisonExpression, LessThanComparisonExpression, LessThanEqualComparisonExpression, LikeComparisonExpression, diff --git a/stix2/constants.py b/stix2/constants.py index fb84c19..142e9d7 100644 --- a/stix2/constants.py +++ b/stix2/constants.py @@ -121,5 +121,15 @@ class ListConstant(Constant): def make_constant(value): - # TODO: Stub - pass + if isinstance(value, str): + return StringConstant(value) + elif isinstance(value, int): + return IntegerConstant(value) + elif isinstance(value, float): + return FloatConstant(value) + elif isinstance(value, list): + return ListConstant(value) + elif isinstance(value, bool): + return BooleanConstant(value) + else: + raise ValueError("Unable to create a constant from %s" % value) diff --git a/stix2/object_path.py b/stix2/object_path.py new file mode 100644 index 0000000..22ebef7 --- /dev/null +++ b/stix2/object_path.py @@ -0,0 +1,59 @@ + +class ObjectPathComponent(object): + pass + + +class BasicObjectPathComponent(ObjectPathComponent): + def __init__(self, property_name, is_key=False): + self.property_name = property_name + # TODO: set is_key to True if this component is a dictionary key + # self.is_key = is_key + + def __str__(self): + return self.property_name + + +class ListObjectPathComponent(ObjectPathComponent): + def __init__(self, property_name, index): + self.property_name = property_name + self.index = index + + def __str__(self): + return "%s[%s]" % (self.property_name, self.index) + + +class ReferenceObjectPathComponent(ObjectPathComponent): + def __init__(self, reference_property_name): + self.property_name = reference_property_name + + def __str__(self): + return self.property_name + + +class ObjectPath(object): + def __init__(self, object_type_name, property_path): + self.object_type_name = object_type_name + self.property_path = [x if isinstance(x, ObjectPathComponent) else ObjectPath.create_ObjectPathComponent(x) + for x in property_path] + + def __str__(self): + return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path])) + + def merge(self, other): + self.property_path.extend(other.property_path) + return self + + @staticmethod + def make_object_path(lhs): + path_as_parts = lhs.split(":") + return ObjectPath(path_as_parts[0], path_as_parts[1].split(".")) + + @staticmethod + def create_ObjectPathComponent(component_name): + if component_name.endswith("_ref"): + return ReferenceObjectPathComponent(component_name) + elif component_name.find("[") != -1: + parse1 = component_name.split("[") + return ListObjectPathComponent(parse1[0], parse1[1][:-1]) + else: + return BasicObjectPathComponent(component_name) diff --git a/stix2/observables.py b/stix2/observables.py index e38e298..366e007 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -123,7 +123,7 @@ class DomainName(_Observable): class EmailAddress(_Observable): - _type = 'email-address' + _type = 'email-addr' _properties = { 'type': TypeProperty(_type), 'value': StringProperty(required=True), @@ -651,7 +651,7 @@ OBJ_MAP_OBSERVABLE = { 'autonomous-system': AutonomousSystem, 'directory': Directory, 'domain-name': DomainName, - 'email-address': EmailAddress, + 'email-addr': EmailAddress, 'email-message': EmailMessage, 'file': File, 'ipv4-addr': IPv4Address, diff --git a/stix2/pattern_expressions.py b/stix2/pattern_expressions.py index 12aadc2..6804389 100644 --- a/stix2/pattern_expressions.py +++ b/stix2/pattern_expressions.py @@ -1,13 +1,9 @@ -from .constants import ListConstant, make_constant +from .constants import Constant, IntegerConstant, ListConstant, make_constant +from .object_path import ObjectPath class PatternExpression(object): - @staticmethod - def get_root_from_object_path(lhs): - path_as_parts = lhs.split(":") - return path_as_parts[0] - @staticmethod def escape_quotes_and_backslashes(s): return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") @@ -19,13 +15,16 @@ class ComparisonExpression(PatternExpression): self.operator = "IN" else: self.operator = operator - self.lhs = lhs - if isinstance(rhs, str): - self.rhs = make_constant(rhs) + if isinstance(lhs, ObjectPath): + self.lhs = lhs else: + self.lhs = ObjectPath.make_object_path(lhs) + if isinstance(rhs, Constant): self.rhs = rhs + else: + self.rhs = make_constant(rhs) self.negated = negated - self.root_type = self.get_root_from_object_path(lhs) + self.root_type = self.lhs.object_type_name def __str__(self): # if isinstance(self.rhs, list): @@ -81,7 +80,14 @@ class MatchesComparisonExpression(ComparisonExpression): super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated) -# TODO: ISASUBSET, ISSUPERSET +class IsSubsetComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated) + + +class IsSupersetComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated) class BooleanExpression(PatternExpression): @@ -165,7 +171,12 @@ class ExpressionQualifier(PatternExpression): class RepeatQualifier(ExpressionQualifier): def __init__(self, times_to_repeat): - self.times_to_repeat = times_to_repeat + if isinstance(times_to_repeat, IntegerConstant): + self.times_to_repeat = times_to_repeat + elif isinstance(times_to_repeat, int): + self.times_to_repeat = IntegerConstant(times_to_repeat) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat) def __str__(self): return "REPEATS %s TIMES" % self.times_to_repeat @@ -173,7 +184,12 @@ class RepeatQualifier(ExpressionQualifier): class WithinQualifier(ExpressionQualifier): def __init__(self, number_of_seconds): - self.number_of_seconds = number_of_seconds + if isinstance(number_of_seconds, IntegerConstant): + self.number_of_seconds = number_of_seconds + elif isinstance(number_of_seconds, int): + self.number_of_seconds = IntegerConstant(number_of_seconds) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds) def __str__(self): return "WITHIN %s SECONDS" % self.number_of_seconds @@ -181,8 +197,18 @@ class WithinQualifier(ExpressionQualifier): class StartStopQualifier(ExpressionQualifier): def __init__(self, start_time, stop_time): - self.start_time = start_time - self.stop_time = stop_time + if isinstance(start_time, IntegerConstant): + self.start_time = start_time + elif isinstance(start_time, int): + self.start_time = IntegerConstant(start_time) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time) + if isinstance(stop_time, IntegerConstant): + self.stop_time = stop_time + elif isinstance(stop_time, int): + self.stop_time = IntegerConstant(stop_time) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time) def __str__(self): return "START %s STOP %s" % (self.start_time, self.stop_time) diff --git a/stix2/test/test_observed_data.py b/stix2/test/test_observed_data.py index 75f3070..d5641e7 100644 --- a/stix2/test/test_observed_data.py +++ b/stix2/test/test_observed_data.py @@ -225,7 +225,7 @@ def test_parse_autonomous_system_valid(data): @pytest.mark.parametrize("data", [ """{ - "type": "email-address", + "type": "email-addr", "value": "john@example.com", "display_name": "John Doe", "belongs_to_ref": "0" @@ -233,7 +233,7 @@ def test_parse_autonomous_system_valid(data): ]) def test_parse_email_address(data): odata = stix2.parse_observable(data, {"0": "user-account"}) - assert odata.type == "email-address" + assert odata.type == "email-addr" odata_str = re.compile('"belongs_to_ref": "0"', re.DOTALL).sub('"belongs_to_ref": "3"', data) with pytest.raises(stix2.exceptions.InvalidObjRefError): diff --git a/stix2/test/test_pattern_expressions.py b/stix2/test/test_pattern_expressions.py index fdfb1ee..2e8495b 100644 --- a/stix2/test/test_pattern_expressions.py +++ b/stix2/test/test_pattern_expressions.py @@ -26,18 +26,32 @@ def test_boolean_expression_with_parentheses(): assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')" # noqa +def test_hash_followed_by_registryKey_expression_python_constant(): + hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", + stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) + o_exp1 = stix2.ObservableExpression(hash_exp) + reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), + stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) + o_exp2 = stix2.ObservableExpression(reg_exp) + fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) + para_exp = stix2.ParentheticalExpression(fb_exp) + qual_exp = stix2.WithinQualifier(300) + exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) + assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa + + def test_hash_followed_by_registryKey_expression(): hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) o_exp1 = stix2.ObservableExpression(hash_exp) - reg_exp = stix2.EqualityComparisonExpression("win-registry-key:key", + reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) o_exp2 = stix2.ObservableExpression(reg_exp) fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) para_exp = stix2.ParentheticalExpression(fb_exp) qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300)) exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) - assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [win-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa + assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa def test_file_observable_expression(): @@ -82,7 +96,7 @@ def test_root_types(): def test_artifact_payload(): exp1 = stix2.EqualityComparisonExpression("artifact:mime_type", - stix2.StringConstant("application/vnd.tcpdump.pcap")) + "application/vnd.tcpdump.pcap") exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin", stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00")) and_exp = stix2.AndBooleanExpression([exp1, exp2]) @@ -90,8 +104,67 @@ def test_artifact_payload(): assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa +def test_greater_than_python_constant(): + exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", + 7.0) + exp = stix2.ObservableExpression(exp1) + assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" + + def test_greater_than(): exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", stix2.FloatConstant(7.0)) exp = stix2.ObservableExpression(exp1) assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" + + +def test_and_observable_expression(): + exp1 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", + "unix"), + stix2.EqualityComparisonExpression("user-account:user_id", + stix2.StringConstant("1007")), + stix2.EqualityComparisonExpression("user-account:account_login", + "Peter")]) + exp2 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", + "unix"), + stix2.EqualityComparisonExpression("user-account:user_id", + stix2.StringConstant("1008")), + stix2.EqualityComparisonExpression("user-account:account_login", + "Paul")]) + exp3 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", + "unix"), + stix2.EqualityComparisonExpression("user-account:user_id", + stix2.StringConstant("1009")), + stix2.EqualityComparisonExpression("user-account:account_login", + "Mary")]) + exp = stix2.AndObservableExpression([stix2.ObservableExpression(exp1), + stix2.ObservableExpression(exp2), + stix2.ObservableExpression(exp3)]) + assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']" # noqa + + +def test_hex(): + exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("file:mime_type", + "image/bmp"), + stix2.EqualityComparisonExpression("file:magic_number_hex", + stix2.HexConstant("ffd8"))]) + exp = stix2.ObservableExpression(exp_and) + assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']" + + +def test_multiple_qualifiers(): + exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("network-traffic:dst_ref.type", + "domain-name"), + stix2.EqualityComparisonExpression("network-traffic:dst_ref.value", + "example.com")]) + exp_ob = stix2.ObservableExpression(exp_and) + qual_rep = stix2.RepeatQualifier(5) + qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800)) + exp = stix2.QualifiedObservationExpression(stix2.QualifiedObservationExpression(exp_ob, qual_rep), qual_within) + assert str(exp) == "[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS" # noqa + + +def test_set_op(): + exp = stix2.ObservableExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value", + "2001:0db8:dead:beef:0000:0000:0000:0000/64")) + assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']" From 27af0c0d5bea06c8de72dfeaf629e8719fbc8733 Mon Sep 17 00:00:00 2001 From: Richard Piazza Date: Thu, 13 Jul 2017 12:35:45 -0400 Subject: [PATCH 5/7] Fixed style errors --- stix2/test/test_pattern_expressions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stix2/test/test_pattern_expressions.py b/stix2/test/test_pattern_expressions.py index 2e8495b..717c185 100644 --- a/stix2/test/test_pattern_expressions.py +++ b/stix2/test/test_pattern_expressions.py @@ -80,7 +80,7 @@ def test_multiple_file_observable_expression(): op1_exp = stix2.ObservableExpression(bool1_exp) op2_exp = stix2.ObservableExpression(exp3) exp = stix2.AndObservableExpression([op1_exp, op2_exp]) - assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa + assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa def test_root_types(): From 979c09d8c0429e7c474a442827c9dae00953b70a Mon Sep 17 00:00:00 2001 From: Richard Piazza Date: Thu, 13 Jul 2017 13:30:21 -0400 Subject: [PATCH 6/7] fixed import sorting error --- .pre-commit-config.yaml | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3dba8ef..201a248 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,13 +1,12 @@ - repo: https://github.com/pre-commit/pre-commit-hooks - sha: e626cd57090d8df0be21e4df0f4e55cc3511d6ab + sha: ea227f024bd89d638aea319c92806737e3375979 hooks: - id: trailing-whitespace - id: flake8 args: - --max-line-length=160 - id: check-merge-conflict - -- repo: https://github.com/FalconSocial/pre-commit-python-sorter - sha: 1.0.4 - hooks: - - id: python-import-sorter +- repo: https://github.com/FalconSocial/pre-commit-python-sorter + sha: b57843b0b874df1d16eb0bef00b868792cb245c2 + hooks: + - id: python-import-sorter From a2aacc5e206c95bbd79f8fc75d158973b95738bb Mon Sep 17 00:00:00 2001 From: Richard Piazza Date: Wed, 19 Jul 2017 09:39:17 -0400 Subject: [PATCH 7/7] merge all classes into patterns.py --- stix2/__init__.py | 39 +-- stix2/constants.py | 135 -------- stix2/object_path.py | 59 ---- stix2/pattern_expressions.py | 223 ------------- stix2/patterns.py | 419 +++++++++++++++++++++++++ stix2/test/test_pattern_expressions.py | 46 +-- 6 files changed, 460 insertions(+), 461 deletions(-) delete mode 100644 stix2/constants.py delete mode 100644 stix2/object_path.py delete mode 100644 stix2/pattern_expressions.py create mode 100644 stix2/patterns.py diff --git a/stix2/__init__.py b/stix2/__init__.py index 637672a..a58ba13 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -4,9 +4,6 @@ from . import exceptions from .bundle import Bundle -from .constants import (FloatConstant, HashConstant, HexConstant, - IntegerConstant, StringConstant) -from .object_path import ObjectPath, ObjectPathComponent from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, CustomObservable, Directory, DomainName, EmailAddress, EmailMessage, @@ -23,25 +20,23 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, ExternalReference, GranularMarking, KillChainPhase, MarkingDefinition, StatementMarking, TLPMarking) -from .pattern_expressions import (AndBooleanExpression, - AndObservableExpression, - ComparisonExpression, - EqualityComparisonExpression, - FollowedByObservableExpression, - GreaterThanComparisonExpression, - GreaterThanEqualComparisonExpression, - IsSubsetComparisonExpression, - IsSupersetComparisonExpression, - LessThanComparisonExpression, - LessThanEqualComparisonExpression, - LikeComparisonExpression, - MatchesComparisonExpression, - ObservableExpression, OrBooleanExpression, - OrObservableExpression, - ParentheticalExpression, - QualifiedObservationExpression, - RepeatQualifier, StartStopQualifier, - WithinQualifier) +from .patterns import (AndBooleanExpression, AndObservationExpression, + BasicObjectPathComponent, EqualityComparisonExpression, + FloatConstant, FollowedByObservationExpression, + GreaterThanComparisonExpression, + GreaterThanEqualComparisonExpression, HashConstant, + HexConstant, IntegerConstant, + IsSubsetComparisonExpression, + IsSupersetComparisonExpression, + LessThanComparisonExpression, + LessThanEqualComparisonExpression, + LikeComparisonExpression, ListConstant, + ListObjectPathComponent, MatchesComparisonExpression, + ObjectPath, ObservationExpression, OrBooleanExpression, + OrObservationExpression, ParentheticalExpression, + QualifiedObservationExpression, + ReferenceObjectPathComponent, RepeatQualifier, + StartStopQualifier, StringConstant, WithinQualifier) from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, Identity, Indicator, IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, Vulnerability) diff --git a/stix2/constants.py b/stix2/constants.py deleted file mode 100644 index 142e9d7..0000000 --- a/stix2/constants.py +++ /dev/null @@ -1,135 +0,0 @@ -import base64 -import binascii -import re - -# TODO: REConstant? -# TODO: Timestamp - - -class Constant(object): - def __str__(self): - return "%s" % self.value - - @staticmethod - def escape_quotes_and_backslashes(s): - return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") - - -class StringConstant(Constant): - def __init__(self, value): - self.value = value - - def __str__(self): - return "'%s'" % StringConstant.escape_quotes_and_backslashes(self.value) - - -class IntegerConstant(Constant): - def __init__(self, value): - try: - self.value = int(value) - except Exception: - raise ValueError("must be an integer.") - - -class FloatConstant(Constant): - def __init__(self, value): - try: - self.value = float(value) - except Exception: - raise ValueError("must be an float.") - - -class BooleanConstant(Constant): - def __init__(self, value): - if isinstance(value, bool): - self.value = value - - trues = ['true', 't'] - falses = ['false', 'f'] - try: - if value.lower() in trues: - self.value = True - if value.lower() in falses: - self.value = False - except AttributeError: - if value == 1: - self.value = True - if value == 0: - self.value = False - - raise ValueError("must be a boolean value.") - - -_HASH_REGEX = { - "MD5": ("^[a-fA-F0-9]{32}$", "MD5"), - "MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"), - "RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"), - "SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"), - "SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"), - "SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"), - "SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"), - "SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"), - "SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"), - "SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"), - "SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"), - "SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"), - "SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"), - "WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"), -} - - -class HashConstant(StringConstant): - def __init__(self, value, type): - key = type.upper().replace('-', '') - if key in _HASH_REGEX: - vocab_key = _HASH_REGEX[key][1] - if not re.match(_HASH_REGEX[key][0], value): - raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key)) - self.value = value - - -class BinaryConstant(Constant): - - def __init__(self, value): - try: - base64.b64decode(value) - self.value = value - except (binascii.Error, TypeError): - raise ValueError("must contain a base64 encoded string") - - def __str__(self): - return "b'%s'" % self.value - - -class HexConstant(Constant): - - def __init__(self, value): - if not re.match('^([a-fA-F0-9]{2})+$', value): - raise ValueError("must contain an even number of hexadecimal characters") - self.value = value - - def __str__(self): - return "h'%s'" % self.value - - -class ListConstant(Constant): - def __init__(self, values): - self.value = values - - def __str__(self): - return "(" + ", ".join([("%s" % x) for x in self.value]) + ")" - - -def make_constant(value): - if isinstance(value, str): - return StringConstant(value) - elif isinstance(value, int): - return IntegerConstant(value) - elif isinstance(value, float): - return FloatConstant(value) - elif isinstance(value, list): - return ListConstant(value) - elif isinstance(value, bool): - return BooleanConstant(value) - else: - raise ValueError("Unable to create a constant from %s" % value) diff --git a/stix2/object_path.py b/stix2/object_path.py deleted file mode 100644 index 22ebef7..0000000 --- a/stix2/object_path.py +++ /dev/null @@ -1,59 +0,0 @@ - -class ObjectPathComponent(object): - pass - - -class BasicObjectPathComponent(ObjectPathComponent): - def __init__(self, property_name, is_key=False): - self.property_name = property_name - # TODO: set is_key to True if this component is a dictionary key - # self.is_key = is_key - - def __str__(self): - return self.property_name - - -class ListObjectPathComponent(ObjectPathComponent): - def __init__(self, property_name, index): - self.property_name = property_name - self.index = index - - def __str__(self): - return "%s[%s]" % (self.property_name, self.index) - - -class ReferenceObjectPathComponent(ObjectPathComponent): - def __init__(self, reference_property_name): - self.property_name = reference_property_name - - def __str__(self): - return self.property_name - - -class ObjectPath(object): - def __init__(self, object_type_name, property_path): - self.object_type_name = object_type_name - self.property_path = [x if isinstance(x, ObjectPathComponent) else ObjectPath.create_ObjectPathComponent(x) - for x in property_path] - - def __str__(self): - return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path])) - - def merge(self, other): - self.property_path.extend(other.property_path) - return self - - @staticmethod - def make_object_path(lhs): - path_as_parts = lhs.split(":") - return ObjectPath(path_as_parts[0], path_as_parts[1].split(".")) - - @staticmethod - def create_ObjectPathComponent(component_name): - if component_name.endswith("_ref"): - return ReferenceObjectPathComponent(component_name) - elif component_name.find("[") != -1: - parse1 = component_name.split("[") - return ListObjectPathComponent(parse1[0], parse1[1][:-1]) - else: - return BasicObjectPathComponent(component_name) diff --git a/stix2/pattern_expressions.py b/stix2/pattern_expressions.py deleted file mode 100644 index 6804389..0000000 --- a/stix2/pattern_expressions.py +++ /dev/null @@ -1,223 +0,0 @@ -from .constants import Constant, IntegerConstant, ListConstant, make_constant -from .object_path import ObjectPath - - -class PatternExpression(object): - - @staticmethod - def escape_quotes_and_backslashes(s): - return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") - - -class ComparisonExpression(PatternExpression): - def __init__(self, operator, lhs, rhs, negated=False): - if operator == "=" and isinstance(rhs, ListConstant): - self.operator = "IN" - else: - self.operator = operator - if isinstance(lhs, ObjectPath): - self.lhs = lhs - else: - self.lhs = ObjectPath.make_object_path(lhs) - if isinstance(rhs, Constant): - self.rhs = rhs - else: - self.rhs = make_constant(rhs) - self.negated = negated - self.root_type = self.lhs.object_type_name - - def __str__(self): - # if isinstance(self.rhs, list): - # final_rhs = [] - # for r in self.rhs: - # final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'") - # rhs_string = "(" + ", ".join(final_rhs) + ")" - # else: - # rhs_string = self.rhs - if self.negated: - return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs) - else: - return "%s %s %s" % (self.lhs, self.operator, self.rhs) - - -class EqualityComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated) - - -class GreaterThanComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated) - - -class LessThanComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated) - - -class GreaterThanEqualComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(GreaterThanComparisonExpression, self).__init__(">=", lhs, rhs, negated) - - -class LessThanEqualComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(LessThanComparisonExpression, self).__init__("<=", lhs, rhs, negated) - - -class InComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated) - - -class LikeComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated) - - -class MatchesComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated) - - -class IsSubsetComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated) - - -class IsSupersetComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated) - - -class BooleanExpression(PatternExpression): - def __init__(self, operator, operands): - self.operator = operator - self.operands = [] - for arg in operands: - if not hasattr(self, "root_type"): - self.root_type = arg.root_type - elif self.root_type and (self.root_type != arg.root_type) and operator == "AND": - raise ValueError("This expression cannot have a mixed root type") - elif self.root_type and (self.root_type != arg.root_type): - self.root_type = None - self.operands.append(arg) - - def __str__(self): - sub_exprs = [] - for o in self.operands: - sub_exprs.append("%s" % o) - return (" " + self.operator + " ").join(sub_exprs) - - -class AndBooleanExpression(BooleanExpression): - def __init__(self, operands): - super(AndBooleanExpression, self).__init__("AND", operands) - - -class OrBooleanExpression(BooleanExpression): - def __init__(self, operands): - super(OrBooleanExpression, self).__init__("OR", operands) - - -class ObservableExpression(PatternExpression): - def __init__(self, operand): - self.operand = operand - - def __str__(self): - return "[%s]" % self.operand - - -class CompoundObservableExpression(PatternExpression): - def __init__(self, operator, operands): - self.operator = operator - self.operands = operands - - def __str__(self): - sub_exprs = [] - for o in self.operands: - sub_exprs.append("%s" % o) - return (" " + self.operator + " ").join(sub_exprs) - - -class AndObservableExpression(CompoundObservableExpression): - def __init__(self, operands): - super(AndObservableExpression, self).__init__("AND", operands) - - -class OrObservableExpression(CompoundObservableExpression): - def __init__(self, operands): - super(OrObservableExpression, self).__init__("OR", operands) - - -class FollowedByObservableExpression(CompoundObservableExpression): - def __init__(self, operands): - super(FollowedByObservableExpression, self).__init__("FOLLOWEDBY", operands) - - -class ParentheticalExpression(PatternExpression): - def __init__(self, exp): - self.expression = exp - if hasattr(exp, "root_type"): - self.root_type = exp.root_type - - def __str__(self): - return "(%s)" % self.expression - - -class ExpressionQualifier(PatternExpression): - pass - - -class RepeatQualifier(ExpressionQualifier): - def __init__(self, times_to_repeat): - if isinstance(times_to_repeat, IntegerConstant): - self.times_to_repeat = times_to_repeat - elif isinstance(times_to_repeat, int): - self.times_to_repeat = IntegerConstant(times_to_repeat) - else: - raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat) - - def __str__(self): - return "REPEATS %s TIMES" % self.times_to_repeat - - -class WithinQualifier(ExpressionQualifier): - def __init__(self, number_of_seconds): - if isinstance(number_of_seconds, IntegerConstant): - self.number_of_seconds = number_of_seconds - elif isinstance(number_of_seconds, int): - self.number_of_seconds = IntegerConstant(number_of_seconds) - else: - raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds) - - def __str__(self): - return "WITHIN %s SECONDS" % self.number_of_seconds - - -class StartStopQualifier(ExpressionQualifier): - def __init__(self, start_time, stop_time): - if isinstance(start_time, IntegerConstant): - self.start_time = start_time - elif isinstance(start_time, int): - self.start_time = IntegerConstant(start_time) - else: - raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time) - if isinstance(stop_time, IntegerConstant): - self.stop_time = stop_time - elif isinstance(stop_time, int): - self.stop_time = IntegerConstant(stop_time) - else: - raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time) - - def __str__(self): - return "START %s STOP %s" % (self.start_time, self.stop_time) - - -class QualifiedObservationExpression(PatternExpression): - def __init__(self, observation_expression, qualifier): - self.observation_expression = observation_expression - self.qualifier = qualifier - - def __str__(self): - return "%s %s" % (self.observation_expression, self.qualifier) diff --git a/stix2/patterns.py b/stix2/patterns.py new file mode 100644 index 0000000..d861144 --- /dev/null +++ b/stix2/patterns.py @@ -0,0 +1,419 @@ +import base64 +import binascii +import re + + +def escape_quotes_and_backslashes(s): + return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") + + +class _Constant(object): + pass + + +class StringConstant(_Constant): + def __init__(self, value): + self.value = value + + def __str__(self): + return "'%s'" % escape_quotes_and_backslashes(self.value) + + +class IntegerConstant(_Constant): + def __init__(self, value): + try: + self.value = int(value) + except Exception: + raise ValueError("must be an integer.") + + def __str__(self): + return "%s" % self.value + + +class FloatConstant(_Constant): + def __init__(self, value): + try: + self.value = float(value) + except Exception: + raise ValueError("must be an float.") + + def __str__(self): + return "%s" % self.value + + +class BooleanConstant(_Constant): + def __init__(self, value): + if isinstance(value, bool): + self.value = value + + trues = ['true', 't'] + falses = ['false', 'f'] + try: + if value.lower() in trues: + self.value = True + if value.lower() in falses: + self.value = False + except AttributeError: + if value == 1: + self.value = True + if value == 0: + self.value = False + + raise ValueError("must be a boolean value.") + + def __str__(self): + return "%s" % self.value + + +_HASH_REGEX = { + "MD5": ("^[a-fA-F0-9]{32}$", "MD5"), + "MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"), + "RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"), + "SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"), + "SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"), + "SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"), + "SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"), + "SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"), + "SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"), + "SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"), + "SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"), + "SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"), + "SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"), + "WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"), +} + + +class HashConstant(StringConstant): + def __init__(self, value, type): + key = type.upper().replace('-', '') + if key in _HASH_REGEX: + vocab_key = _HASH_REGEX[key][1] + if not re.match(_HASH_REGEX[key][0], value): + raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key)) + self.value = value + + +class BinaryConstant(_Constant): + + def __init__(self, value): + try: + base64.b64decode(value) + self.value = value + except (binascii.Error, TypeError): + raise ValueError("must contain a base64 encoded string") + + def __str__(self): + return "b'%s'" % self.value + + +class HexConstant(_Constant): + def __init__(self, value): + if not re.match('^([a-fA-F0-9]{2})+$', value): + raise ValueError("must contain an even number of hexadecimal characters") + self.value = value + + def __str__(self): + return "h'%s'" % self.value + + +class ListConstant(_Constant): + def __init__(self, values): + self.value = values + + def __str__(self): + return "(" + ", ".join([("%s" % x) for x in self.value]) + ")" + + +def make_constant(value): + if isinstance(value, str): + return StringConstant(value) + elif isinstance(value, int): + return IntegerConstant(value) + elif isinstance(value, float): + return FloatConstant(value) + elif isinstance(value, list): + return ListConstant(value) + elif isinstance(value, bool): + return BooleanConstant(value) + else: + raise ValueError("Unable to create a constant from %s" % value) + + +class _ObjectPathComponent(object): + @staticmethod + def create_ObjectPathComponent(component_name): + if component_name.endswith("_ref"): + return ReferenceObjectPathComponent(component_name) + elif component_name.find("[") != -1: + parse1 = component_name.split("[") + return ListObjectPathComponent(parse1[0], parse1[1][:-1]) + else: + return BasicObjectPathComponent(component_name) + + +class BasicObjectPathComponent(_ObjectPathComponent): + def __init__(self, property_name, is_key=False): + self.property_name = property_name + # TODO: set is_key to True if this component is a dictionary key + # self.is_key = is_key + + def __str__(self): + return self.property_name + + +class ListObjectPathComponent(_ObjectPathComponent): + def __init__(self, property_name, index): + self.property_name = property_name + self.index = index + + def __str__(self): + return "%s[%s]" % (self.property_name, self.index) + + +class ReferenceObjectPathComponent(_ObjectPathComponent): + def __init__(self, reference_property_name): + self.property_name = reference_property_name + + def __str__(self): + return self.property_name + + +class ObjectPath(object): + def __init__(self, object_type_name, property_path): + self.object_type_name = object_type_name + self.property_path = [x if isinstance(x, _ObjectPathComponent) else + _ObjectPathComponent.create_ObjectPathComponent(x) + for x in property_path] + + def __str__(self): + return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path])) + + def merge(self, other): + self.property_path.extend(other.property_path) + return self + + @staticmethod + def make_object_path(lhs): + path_as_parts = lhs.split(":") + return ObjectPath(path_as_parts[0], path_as_parts[1].split(".")) + + +class _PatternExpression(object): + + @staticmethod + def escape_quotes_and_backslashes(s): + return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") + + +class _ComparisonExpression(_PatternExpression): + def __init__(self, operator, lhs, rhs, negated=False): + if operator == "=" and isinstance(rhs, ListConstant): + self.operator = "IN" + else: + self.operator = operator + if isinstance(lhs, ObjectPath): + self.lhs = lhs + else: + self.lhs = ObjectPath.make_object_path(lhs) + if isinstance(rhs, _Constant): + self.rhs = rhs + else: + self.rhs = make_constant(rhs) + self.negated = negated + self.root_type = self.lhs.object_type_name + + def __str__(self): + # if isinstance(self.rhs, list): + # final_rhs = [] + # for r in self.rhs: + # final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'") + # rhs_string = "(" + ", ".join(final_rhs) + ")" + # else: + # rhs_string = self.rhs + if self.negated: + return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs) + else: + return "%s %s %s" % (self.lhs, self.operator, self.rhs) + + +class EqualityComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated) + + +class GreaterThanComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated) + + +class LessThanComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated) + + +class GreaterThanEqualComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(GreaterThanEqualComparisonExpression, self).__init__(">=", lhs, rhs, negated) + + +class LessThanEqualComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LessThanEqualComparisonExpression, self).__init__("<=", lhs, rhs, negated) + + +class InComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated) + + +class LikeComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated) + + +class MatchesComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated) + + +class IsSubsetComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated) + + +class IsSupersetComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated) + + +class _BooleanExpression(_PatternExpression): + def __init__(self, operator, operands): + self.operator = operator + self.operands = [] + for arg in operands: + if not hasattr(self, "root_type"): + self.root_type = arg.root_type + elif self.root_type and (self.root_type != arg.root_type) and operator == "AND": + raise ValueError("All operands to an 'AND' expression must have the same object type") + elif self.root_type and (self.root_type != arg.root_type): + self.root_type = None + self.operands.append(arg) + + def __str__(self): + sub_exprs = [] + for o in self.operands: + sub_exprs.append("%s" % o) + return (" " + self.operator + " ").join(sub_exprs) + + +class AndBooleanExpression(_BooleanExpression): + def __init__(self, operands): + super(AndBooleanExpression, self).__init__("AND", operands) + + +class OrBooleanExpression(_BooleanExpression): + def __init__(self, operands): + super(OrBooleanExpression, self).__init__("OR", operands) + + +class ObservationExpression(_PatternExpression): + def __init__(self, operand): + self.operand = operand + + def __str__(self): + return "[%s]" % self.operand + + +class _CompoundObservationExpression(_PatternExpression): + def __init__(self, operator, operands): + self.operator = operator + self.operands = operands + + def __str__(self): + sub_exprs = [] + for o in self.operands: + sub_exprs.append("%s" % o) + return (" " + self.operator + " ").join(sub_exprs) + + +class AndObservationExpression(_CompoundObservationExpression): + def __init__(self, operands): + super(AndObservationExpression, self).__init__("AND", operands) + + +class OrObservationExpression(_CompoundObservationExpression): + def __init__(self, operands): + super(OrObservationExpression, self).__init__("OR", operands) + + +class FollowedByObservationExpression(_CompoundObservationExpression): + def __init__(self, operands): + super(FollowedByObservationExpression, self).__init__("FOLLOWEDBY", operands) + + +class ParentheticalExpression(_PatternExpression): + def __init__(self, exp): + self.expression = exp + if hasattr(exp, "root_type"): + self.root_type = exp.root_type + + def __str__(self): + return "(%s)" % self.expression + + +class _ExpressionQualifier(_PatternExpression): + pass + + +class RepeatQualifier(_ExpressionQualifier): + def __init__(self, times_to_repeat): + if isinstance(times_to_repeat, IntegerConstant): + self.times_to_repeat = times_to_repeat + elif isinstance(times_to_repeat, int): + self.times_to_repeat = IntegerConstant(times_to_repeat) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat) + + def __str__(self): + return "REPEATS %s TIMES" % self.times_to_repeat + + +class WithinQualifier(_ExpressionQualifier): + def __init__(self, number_of_seconds): + if isinstance(number_of_seconds, IntegerConstant): + self.number_of_seconds = number_of_seconds + elif isinstance(number_of_seconds, int): + self.number_of_seconds = IntegerConstant(number_of_seconds) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds) + + def __str__(self): + return "WITHIN %s SECONDS" % self.number_of_seconds + + +class StartStopQualifier(_ExpressionQualifier): + def __init__(self, start_time, stop_time): + if isinstance(start_time, IntegerConstant): + self.start_time = start_time + elif isinstance(start_time, int): + self.start_time = IntegerConstant(start_time) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time) + if isinstance(stop_time, IntegerConstant): + self.stop_time = stop_time + elif isinstance(stop_time, int): + self.stop_time = IntegerConstant(stop_time) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time) + + def __str__(self): + return "START %s STOP %s" % (self.start_time, self.stop_time) + + +class QualifiedObservationExpression(_PatternExpression): + def __init__(self, observation_expression, qualifier): + self.observation_expression = observation_expression + self.qualifier = qualifier + + def __str__(self): + return "%s %s" % (self.observation_expression, self.qualifier) diff --git a/stix2/test/test_pattern_expressions.py b/stix2/test/test_pattern_expressions.py index 717c185..e806aa6 100644 --- a/stix2/test/test_pattern_expressions.py +++ b/stix2/test/test_pattern_expressions.py @@ -18,7 +18,9 @@ def test_boolean_expression(): def test_boolean_expression_with_parentheses(): - exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", + exp1 = stix2.MatchesComparisonExpression(stix2.ObjectPath("email-message", + [stix2.ReferenceObjectPathComponent("from_ref"), + stix2.BasicObjectPathComponent("value")]), stix2.StringConstant(".+\\@example\\.com$")) exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", stix2.StringConstant("^Final Report.+\\.exe$")) @@ -29,11 +31,11 @@ def test_boolean_expression_with_parentheses(): def test_hash_followed_by_registryKey_expression_python_constant(): hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) - o_exp1 = stix2.ObservableExpression(hash_exp) + o_exp1 = stix2.ObservationExpression(hash_exp) reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) - o_exp2 = stix2.ObservableExpression(reg_exp) - fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) + o_exp2 = stix2.ObservationExpression(reg_exp) + fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2]) para_exp = stix2.ParentheticalExpression(fb_exp) qual_exp = stix2.WithinQualifier(300) exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) @@ -43,11 +45,11 @@ def test_hash_followed_by_registryKey_expression_python_constant(): def test_hash_followed_by_registryKey_expression(): hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) - o_exp1 = stix2.ObservableExpression(hash_exp) + o_exp1 = stix2.ObservationExpression(hash_exp) reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) - o_exp2 = stix2.ObservableExpression(reg_exp) - fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) + o_exp2 = stix2.ObservationExpression(reg_exp) + fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2]) para_exp = stix2.ParentheticalExpression(fb_exp) qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300)) exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) @@ -61,7 +63,7 @@ def test_file_observable_expression(): 'SHA-256')) exp2 = stix2.EqualityComparisonExpression("file:mime_type", stix2.StringConstant("application/x-pdf")) bool_exp = stix2.AndBooleanExpression([exp1, exp2]) - exp = stix2.ObservableExpression(bool_exp) + exp = stix2.ObservationExpression(bool_exp) assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa @@ -77,14 +79,14 @@ def test_multiple_file_observable_expression(): stix2.HashConstant( "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", 'SHA-256')) - op1_exp = stix2.ObservableExpression(bool1_exp) - op2_exp = stix2.ObservableExpression(exp3) - exp = stix2.AndObservableExpression([op1_exp, op2_exp]) + op1_exp = stix2.ObservationExpression(bool1_exp) + op2_exp = stix2.ObservationExpression(exp3) + exp = stix2.AndObservationExpression([op1_exp, op2_exp]) assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa def test_root_types(): - ast = stix2.ObservableExpression( + ast = stix2.ObservationExpression( stix2.AndBooleanExpression( [stix2.ParentheticalExpression( stix2.OrBooleanExpression([ @@ -100,21 +102,21 @@ def test_artifact_payload(): exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin", stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00")) and_exp = stix2.AndBooleanExpression([exp1, exp2]) - exp = stix2.ObservableExpression(and_exp) + exp = stix2.ObservationExpression(and_exp) assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa def test_greater_than_python_constant(): exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", 7.0) - exp = stix2.ObservableExpression(exp1) + exp = stix2.ObservationExpression(exp1) assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" def test_greater_than(): exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", stix2.FloatConstant(7.0)) - exp = stix2.ObservableExpression(exp1) + exp = stix2.ObservationExpression(exp1) assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" @@ -137,9 +139,9 @@ def test_and_observable_expression(): stix2.StringConstant("1009")), stix2.EqualityComparisonExpression("user-account:account_login", "Mary")]) - exp = stix2.AndObservableExpression([stix2.ObservableExpression(exp1), - stix2.ObservableExpression(exp2), - stix2.ObservableExpression(exp3)]) + exp = stix2.AndObservationExpression([stix2.ObservationExpression(exp1), + stix2.ObservationExpression(exp2), + stix2.ObservationExpression(exp3)]) assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']" # noqa @@ -148,7 +150,7 @@ def test_hex(): "image/bmp"), stix2.EqualityComparisonExpression("file:magic_number_hex", stix2.HexConstant("ffd8"))]) - exp = stix2.ObservableExpression(exp_and) + exp = stix2.ObservationExpression(exp_and) assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']" @@ -157,7 +159,7 @@ def test_multiple_qualifiers(): "domain-name"), stix2.EqualityComparisonExpression("network-traffic:dst_ref.value", "example.com")]) - exp_ob = stix2.ObservableExpression(exp_and) + exp_ob = stix2.ObservationExpression(exp_and) qual_rep = stix2.RepeatQualifier(5) qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800)) exp = stix2.QualifiedObservationExpression(stix2.QualifiedObservationExpression(exp_ob, qual_rep), qual_within) @@ -165,6 +167,6 @@ def test_multiple_qualifiers(): def test_set_op(): - exp = stix2.ObservableExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value", - "2001:0db8:dead:beef:0000:0000:0000:0000/64")) + exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value", + "2001:0db8:dead:beef:0000:0000:0000:0000/64")) assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']"