From a2aacc5e206c95bbd79f8fc75d158973b95738bb Mon Sep 17 00:00:00 2001 From: Richard Piazza Date: Wed, 19 Jul 2017 09:39:17 -0400 Subject: [PATCH] merge all classes into patterns.py --- stix2/__init__.py | 39 +-- stix2/constants.py | 135 -------- stix2/object_path.py | 59 ---- stix2/pattern_expressions.py | 223 ------------- stix2/patterns.py | 419 +++++++++++++++++++++++++ stix2/test/test_pattern_expressions.py | 46 +-- 6 files changed, 460 insertions(+), 461 deletions(-) delete mode 100644 stix2/constants.py delete mode 100644 stix2/object_path.py delete mode 100644 stix2/pattern_expressions.py create mode 100644 stix2/patterns.py diff --git a/stix2/__init__.py b/stix2/__init__.py index 637672a..a58ba13 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -4,9 +4,6 @@ from . import exceptions from .bundle import Bundle -from .constants import (FloatConstant, HashConstant, HexConstant, - IntegerConstant, StringConstant) -from .object_path import ObjectPath, ObjectPathComponent from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, CustomObservable, Directory, DomainName, EmailAddress, EmailMessage, @@ -23,25 +20,23 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, ExternalReference, GranularMarking, KillChainPhase, MarkingDefinition, StatementMarking, TLPMarking) -from .pattern_expressions import (AndBooleanExpression, - AndObservableExpression, - ComparisonExpression, - EqualityComparisonExpression, - FollowedByObservableExpression, - GreaterThanComparisonExpression, - GreaterThanEqualComparisonExpression, - IsSubsetComparisonExpression, - IsSupersetComparisonExpression, - LessThanComparisonExpression, - LessThanEqualComparisonExpression, - LikeComparisonExpression, - MatchesComparisonExpression, - ObservableExpression, OrBooleanExpression, - OrObservableExpression, - ParentheticalExpression, - QualifiedObservationExpression, - RepeatQualifier, StartStopQualifier, - WithinQualifier) +from .patterns import (AndBooleanExpression, AndObservationExpression, + BasicObjectPathComponent, EqualityComparisonExpression, + FloatConstant, FollowedByObservationExpression, + GreaterThanComparisonExpression, + GreaterThanEqualComparisonExpression, HashConstant, + HexConstant, IntegerConstant, + IsSubsetComparisonExpression, + IsSupersetComparisonExpression, + LessThanComparisonExpression, + LessThanEqualComparisonExpression, + LikeComparisonExpression, ListConstant, + ListObjectPathComponent, MatchesComparisonExpression, + ObjectPath, ObservationExpression, OrBooleanExpression, + OrObservationExpression, ParentheticalExpression, + QualifiedObservationExpression, + ReferenceObjectPathComponent, RepeatQualifier, + StartStopQualifier, StringConstant, WithinQualifier) from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, Identity, Indicator, IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, Vulnerability) diff --git a/stix2/constants.py b/stix2/constants.py deleted file mode 100644 index 142e9d7..0000000 --- a/stix2/constants.py +++ /dev/null @@ -1,135 +0,0 @@ -import base64 -import binascii -import re - -# TODO: REConstant? -# TODO: Timestamp - - -class Constant(object): - def __str__(self): - return "%s" % self.value - - @staticmethod - def escape_quotes_and_backslashes(s): - return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") - - -class StringConstant(Constant): - def __init__(self, value): - self.value = value - - def __str__(self): - return "'%s'" % StringConstant.escape_quotes_and_backslashes(self.value) - - -class IntegerConstant(Constant): - def __init__(self, value): - try: - self.value = int(value) - except Exception: - raise ValueError("must be an integer.") - - -class FloatConstant(Constant): - def __init__(self, value): - try: - self.value = float(value) - except Exception: - raise ValueError("must be an float.") - - -class BooleanConstant(Constant): - def __init__(self, value): - if isinstance(value, bool): - self.value = value - - trues = ['true', 't'] - falses = ['false', 'f'] - try: - if value.lower() in trues: - self.value = True - if value.lower() in falses: - self.value = False - except AttributeError: - if value == 1: - self.value = True - if value == 0: - self.value = False - - raise ValueError("must be a boolean value.") - - -_HASH_REGEX = { - "MD5": ("^[a-fA-F0-9]{32}$", "MD5"), - "MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"), - "RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"), - "SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"), - "SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"), - "SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"), - "SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"), - "SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"), - "SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"), - "SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"), - "SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"), - "SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"), - "SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"), - "WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"), -} - - -class HashConstant(StringConstant): - def __init__(self, value, type): - key = type.upper().replace('-', '') - if key in _HASH_REGEX: - vocab_key = _HASH_REGEX[key][1] - if not re.match(_HASH_REGEX[key][0], value): - raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key)) - self.value = value - - -class BinaryConstant(Constant): - - def __init__(self, value): - try: - base64.b64decode(value) - self.value = value - except (binascii.Error, TypeError): - raise ValueError("must contain a base64 encoded string") - - def __str__(self): - return "b'%s'" % self.value - - -class HexConstant(Constant): - - def __init__(self, value): - if not re.match('^([a-fA-F0-9]{2})+$', value): - raise ValueError("must contain an even number of hexadecimal characters") - self.value = value - - def __str__(self): - return "h'%s'" % self.value - - -class ListConstant(Constant): - def __init__(self, values): - self.value = values - - def __str__(self): - return "(" + ", ".join([("%s" % x) for x in self.value]) + ")" - - -def make_constant(value): - if isinstance(value, str): - return StringConstant(value) - elif isinstance(value, int): - return IntegerConstant(value) - elif isinstance(value, float): - return FloatConstant(value) - elif isinstance(value, list): - return ListConstant(value) - elif isinstance(value, bool): - return BooleanConstant(value) - else: - raise ValueError("Unable to create a constant from %s" % value) diff --git a/stix2/object_path.py b/stix2/object_path.py deleted file mode 100644 index 22ebef7..0000000 --- a/stix2/object_path.py +++ /dev/null @@ -1,59 +0,0 @@ - -class ObjectPathComponent(object): - pass - - -class BasicObjectPathComponent(ObjectPathComponent): - def __init__(self, property_name, is_key=False): - self.property_name = property_name - # TODO: set is_key to True if this component is a dictionary key - # self.is_key = is_key - - def __str__(self): - return self.property_name - - -class ListObjectPathComponent(ObjectPathComponent): - def __init__(self, property_name, index): - self.property_name = property_name - self.index = index - - def __str__(self): - return "%s[%s]" % (self.property_name, self.index) - - -class ReferenceObjectPathComponent(ObjectPathComponent): - def __init__(self, reference_property_name): - self.property_name = reference_property_name - - def __str__(self): - return self.property_name - - -class ObjectPath(object): - def __init__(self, object_type_name, property_path): - self.object_type_name = object_type_name - self.property_path = [x if isinstance(x, ObjectPathComponent) else ObjectPath.create_ObjectPathComponent(x) - for x in property_path] - - def __str__(self): - return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path])) - - def merge(self, other): - self.property_path.extend(other.property_path) - return self - - @staticmethod - def make_object_path(lhs): - path_as_parts = lhs.split(":") - return ObjectPath(path_as_parts[0], path_as_parts[1].split(".")) - - @staticmethod - def create_ObjectPathComponent(component_name): - if component_name.endswith("_ref"): - return ReferenceObjectPathComponent(component_name) - elif component_name.find("[") != -1: - parse1 = component_name.split("[") - return ListObjectPathComponent(parse1[0], parse1[1][:-1]) - else: - return BasicObjectPathComponent(component_name) diff --git a/stix2/pattern_expressions.py b/stix2/pattern_expressions.py deleted file mode 100644 index 6804389..0000000 --- a/stix2/pattern_expressions.py +++ /dev/null @@ -1,223 +0,0 @@ -from .constants import Constant, IntegerConstant, ListConstant, make_constant -from .object_path import ObjectPath - - -class PatternExpression(object): - - @staticmethod - def escape_quotes_and_backslashes(s): - return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") - - -class ComparisonExpression(PatternExpression): - def __init__(self, operator, lhs, rhs, negated=False): - if operator == "=" and isinstance(rhs, ListConstant): - self.operator = "IN" - else: - self.operator = operator - if isinstance(lhs, ObjectPath): - self.lhs = lhs - else: - self.lhs = ObjectPath.make_object_path(lhs) - if isinstance(rhs, Constant): - self.rhs = rhs - else: - self.rhs = make_constant(rhs) - self.negated = negated - self.root_type = self.lhs.object_type_name - - def __str__(self): - # if isinstance(self.rhs, list): - # final_rhs = [] - # for r in self.rhs: - # final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'") - # rhs_string = "(" + ", ".join(final_rhs) + ")" - # else: - # rhs_string = self.rhs - if self.negated: - return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs) - else: - return "%s %s %s" % (self.lhs, self.operator, self.rhs) - - -class EqualityComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated) - - -class GreaterThanComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated) - - -class LessThanComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated) - - -class GreaterThanEqualComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(GreaterThanComparisonExpression, self).__init__(">=", lhs, rhs, negated) - - -class LessThanEqualComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(LessThanComparisonExpression, self).__init__("<=", lhs, rhs, negated) - - -class InComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated) - - -class LikeComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated) - - -class MatchesComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated) - - -class IsSubsetComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated) - - -class IsSupersetComparisonExpression(ComparisonExpression): - def __init__(self, lhs, rhs, negated=False): - super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated) - - -class BooleanExpression(PatternExpression): - def __init__(self, operator, operands): - self.operator = operator - self.operands = [] - for arg in operands: - if not hasattr(self, "root_type"): - self.root_type = arg.root_type - elif self.root_type and (self.root_type != arg.root_type) and operator == "AND": - raise ValueError("This expression cannot have a mixed root type") - elif self.root_type and (self.root_type != arg.root_type): - self.root_type = None - self.operands.append(arg) - - def __str__(self): - sub_exprs = [] - for o in self.operands: - sub_exprs.append("%s" % o) - return (" " + self.operator + " ").join(sub_exprs) - - -class AndBooleanExpression(BooleanExpression): - def __init__(self, operands): - super(AndBooleanExpression, self).__init__("AND", operands) - - -class OrBooleanExpression(BooleanExpression): - def __init__(self, operands): - super(OrBooleanExpression, self).__init__("OR", operands) - - -class ObservableExpression(PatternExpression): - def __init__(self, operand): - self.operand = operand - - def __str__(self): - return "[%s]" % self.operand - - -class CompoundObservableExpression(PatternExpression): - def __init__(self, operator, operands): - self.operator = operator - self.operands = operands - - def __str__(self): - sub_exprs = [] - for o in self.operands: - sub_exprs.append("%s" % o) - return (" " + self.operator + " ").join(sub_exprs) - - -class AndObservableExpression(CompoundObservableExpression): - def __init__(self, operands): - super(AndObservableExpression, self).__init__("AND", operands) - - -class OrObservableExpression(CompoundObservableExpression): - def __init__(self, operands): - super(OrObservableExpression, self).__init__("OR", operands) - - -class FollowedByObservableExpression(CompoundObservableExpression): - def __init__(self, operands): - super(FollowedByObservableExpression, self).__init__("FOLLOWEDBY", operands) - - -class ParentheticalExpression(PatternExpression): - def __init__(self, exp): - self.expression = exp - if hasattr(exp, "root_type"): - self.root_type = exp.root_type - - def __str__(self): - return "(%s)" % self.expression - - -class ExpressionQualifier(PatternExpression): - pass - - -class RepeatQualifier(ExpressionQualifier): - def __init__(self, times_to_repeat): - if isinstance(times_to_repeat, IntegerConstant): - self.times_to_repeat = times_to_repeat - elif isinstance(times_to_repeat, int): - self.times_to_repeat = IntegerConstant(times_to_repeat) - else: - raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat) - - def __str__(self): - return "REPEATS %s TIMES" % self.times_to_repeat - - -class WithinQualifier(ExpressionQualifier): - def __init__(self, number_of_seconds): - if isinstance(number_of_seconds, IntegerConstant): - self.number_of_seconds = number_of_seconds - elif isinstance(number_of_seconds, int): - self.number_of_seconds = IntegerConstant(number_of_seconds) - else: - raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds) - - def __str__(self): - return "WITHIN %s SECONDS" % self.number_of_seconds - - -class StartStopQualifier(ExpressionQualifier): - def __init__(self, start_time, stop_time): - if isinstance(start_time, IntegerConstant): - self.start_time = start_time - elif isinstance(start_time, int): - self.start_time = IntegerConstant(start_time) - else: - raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time) - if isinstance(stop_time, IntegerConstant): - self.stop_time = stop_time - elif isinstance(stop_time, int): - self.stop_time = IntegerConstant(stop_time) - else: - raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time) - - def __str__(self): - return "START %s STOP %s" % (self.start_time, self.stop_time) - - -class QualifiedObservationExpression(PatternExpression): - def __init__(self, observation_expression, qualifier): - self.observation_expression = observation_expression - self.qualifier = qualifier - - def __str__(self): - return "%s %s" % (self.observation_expression, self.qualifier) diff --git a/stix2/patterns.py b/stix2/patterns.py new file mode 100644 index 0000000..d861144 --- /dev/null +++ b/stix2/patterns.py @@ -0,0 +1,419 @@ +import base64 +import binascii +import re + + +def escape_quotes_and_backslashes(s): + return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") + + +class _Constant(object): + pass + + +class StringConstant(_Constant): + def __init__(self, value): + self.value = value + + def __str__(self): + return "'%s'" % escape_quotes_and_backslashes(self.value) + + +class IntegerConstant(_Constant): + def __init__(self, value): + try: + self.value = int(value) + except Exception: + raise ValueError("must be an integer.") + + def __str__(self): + return "%s" % self.value + + +class FloatConstant(_Constant): + def __init__(self, value): + try: + self.value = float(value) + except Exception: + raise ValueError("must be an float.") + + def __str__(self): + return "%s" % self.value + + +class BooleanConstant(_Constant): + def __init__(self, value): + if isinstance(value, bool): + self.value = value + + trues = ['true', 't'] + falses = ['false', 'f'] + try: + if value.lower() in trues: + self.value = True + if value.lower() in falses: + self.value = False + except AttributeError: + if value == 1: + self.value = True + if value == 0: + self.value = False + + raise ValueError("must be a boolean value.") + + def __str__(self): + return "%s" % self.value + + +_HASH_REGEX = { + "MD5": ("^[a-fA-F0-9]{32}$", "MD5"), + "MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"), + "RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"), + "SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"), + "SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"), + "SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"), + "SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"), + "SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"), + "SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"), + "SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"), + "SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"), + "SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"), + "SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"), + "WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"), +} + + +class HashConstant(StringConstant): + def __init__(self, value, type): + key = type.upper().replace('-', '') + if key in _HASH_REGEX: + vocab_key = _HASH_REGEX[key][1] + if not re.match(_HASH_REGEX[key][0], value): + raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key)) + self.value = value + + +class BinaryConstant(_Constant): + + def __init__(self, value): + try: + base64.b64decode(value) + self.value = value + except (binascii.Error, TypeError): + raise ValueError("must contain a base64 encoded string") + + def __str__(self): + return "b'%s'" % self.value + + +class HexConstant(_Constant): + def __init__(self, value): + if not re.match('^([a-fA-F0-9]{2})+$', value): + raise ValueError("must contain an even number of hexadecimal characters") + self.value = value + + def __str__(self): + return "h'%s'" % self.value + + +class ListConstant(_Constant): + def __init__(self, values): + self.value = values + + def __str__(self): + return "(" + ", ".join([("%s" % x) for x in self.value]) + ")" + + +def make_constant(value): + if isinstance(value, str): + return StringConstant(value) + elif isinstance(value, int): + return IntegerConstant(value) + elif isinstance(value, float): + return FloatConstant(value) + elif isinstance(value, list): + return ListConstant(value) + elif isinstance(value, bool): + return BooleanConstant(value) + else: + raise ValueError("Unable to create a constant from %s" % value) + + +class _ObjectPathComponent(object): + @staticmethod + def create_ObjectPathComponent(component_name): + if component_name.endswith("_ref"): + return ReferenceObjectPathComponent(component_name) + elif component_name.find("[") != -1: + parse1 = component_name.split("[") + return ListObjectPathComponent(parse1[0], parse1[1][:-1]) + else: + return BasicObjectPathComponent(component_name) + + +class BasicObjectPathComponent(_ObjectPathComponent): + def __init__(self, property_name, is_key=False): + self.property_name = property_name + # TODO: set is_key to True if this component is a dictionary key + # self.is_key = is_key + + def __str__(self): + return self.property_name + + +class ListObjectPathComponent(_ObjectPathComponent): + def __init__(self, property_name, index): + self.property_name = property_name + self.index = index + + def __str__(self): + return "%s[%s]" % (self.property_name, self.index) + + +class ReferenceObjectPathComponent(_ObjectPathComponent): + def __init__(self, reference_property_name): + self.property_name = reference_property_name + + def __str__(self): + return self.property_name + + +class ObjectPath(object): + def __init__(self, object_type_name, property_path): + self.object_type_name = object_type_name + self.property_path = [x if isinstance(x, _ObjectPathComponent) else + _ObjectPathComponent.create_ObjectPathComponent(x) + for x in property_path] + + def __str__(self): + return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path])) + + def merge(self, other): + self.property_path.extend(other.property_path) + return self + + @staticmethod + def make_object_path(lhs): + path_as_parts = lhs.split(":") + return ObjectPath(path_as_parts[0], path_as_parts[1].split(".")) + + +class _PatternExpression(object): + + @staticmethod + def escape_quotes_and_backslashes(s): + return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") + + +class _ComparisonExpression(_PatternExpression): + def __init__(self, operator, lhs, rhs, negated=False): + if operator == "=" and isinstance(rhs, ListConstant): + self.operator = "IN" + else: + self.operator = operator + if isinstance(lhs, ObjectPath): + self.lhs = lhs + else: + self.lhs = ObjectPath.make_object_path(lhs) + if isinstance(rhs, _Constant): + self.rhs = rhs + else: + self.rhs = make_constant(rhs) + self.negated = negated + self.root_type = self.lhs.object_type_name + + def __str__(self): + # if isinstance(self.rhs, list): + # final_rhs = [] + # for r in self.rhs: + # final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'") + # rhs_string = "(" + ", ".join(final_rhs) + ")" + # else: + # rhs_string = self.rhs + if self.negated: + return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs) + else: + return "%s %s %s" % (self.lhs, self.operator, self.rhs) + + +class EqualityComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated) + + +class GreaterThanComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated) + + +class LessThanComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated) + + +class GreaterThanEqualComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(GreaterThanEqualComparisonExpression, self).__init__(">=", lhs, rhs, negated) + + +class LessThanEqualComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LessThanEqualComparisonExpression, self).__init__("<=", lhs, rhs, negated) + + +class InComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated) + + +class LikeComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated) + + +class MatchesComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated) + + +class IsSubsetComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated) + + +class IsSupersetComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated) + + +class _BooleanExpression(_PatternExpression): + def __init__(self, operator, operands): + self.operator = operator + self.operands = [] + for arg in operands: + if not hasattr(self, "root_type"): + self.root_type = arg.root_type + elif self.root_type and (self.root_type != arg.root_type) and operator == "AND": + raise ValueError("All operands to an 'AND' expression must have the same object type") + elif self.root_type and (self.root_type != arg.root_type): + self.root_type = None + self.operands.append(arg) + + def __str__(self): + sub_exprs = [] + for o in self.operands: + sub_exprs.append("%s" % o) + return (" " + self.operator + " ").join(sub_exprs) + + +class AndBooleanExpression(_BooleanExpression): + def __init__(self, operands): + super(AndBooleanExpression, self).__init__("AND", operands) + + +class OrBooleanExpression(_BooleanExpression): + def __init__(self, operands): + super(OrBooleanExpression, self).__init__("OR", operands) + + +class ObservationExpression(_PatternExpression): + def __init__(self, operand): + self.operand = operand + + def __str__(self): + return "[%s]" % self.operand + + +class _CompoundObservationExpression(_PatternExpression): + def __init__(self, operator, operands): + self.operator = operator + self.operands = operands + + def __str__(self): + sub_exprs = [] + for o in self.operands: + sub_exprs.append("%s" % o) + return (" " + self.operator + " ").join(sub_exprs) + + +class AndObservationExpression(_CompoundObservationExpression): + def __init__(self, operands): + super(AndObservationExpression, self).__init__("AND", operands) + + +class OrObservationExpression(_CompoundObservationExpression): + def __init__(self, operands): + super(OrObservationExpression, self).__init__("OR", operands) + + +class FollowedByObservationExpression(_CompoundObservationExpression): + def __init__(self, operands): + super(FollowedByObservationExpression, self).__init__("FOLLOWEDBY", operands) + + +class ParentheticalExpression(_PatternExpression): + def __init__(self, exp): + self.expression = exp + if hasattr(exp, "root_type"): + self.root_type = exp.root_type + + def __str__(self): + return "(%s)" % self.expression + + +class _ExpressionQualifier(_PatternExpression): + pass + + +class RepeatQualifier(_ExpressionQualifier): + def __init__(self, times_to_repeat): + if isinstance(times_to_repeat, IntegerConstant): + self.times_to_repeat = times_to_repeat + elif isinstance(times_to_repeat, int): + self.times_to_repeat = IntegerConstant(times_to_repeat) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat) + + def __str__(self): + return "REPEATS %s TIMES" % self.times_to_repeat + + +class WithinQualifier(_ExpressionQualifier): + def __init__(self, number_of_seconds): + if isinstance(number_of_seconds, IntegerConstant): + self.number_of_seconds = number_of_seconds + elif isinstance(number_of_seconds, int): + self.number_of_seconds = IntegerConstant(number_of_seconds) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds) + + def __str__(self): + return "WITHIN %s SECONDS" % self.number_of_seconds + + +class StartStopQualifier(_ExpressionQualifier): + def __init__(self, start_time, stop_time): + if isinstance(start_time, IntegerConstant): + self.start_time = start_time + elif isinstance(start_time, int): + self.start_time = IntegerConstant(start_time) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time) + if isinstance(stop_time, IntegerConstant): + self.stop_time = stop_time + elif isinstance(stop_time, int): + self.stop_time = IntegerConstant(stop_time) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time) + + def __str__(self): + return "START %s STOP %s" % (self.start_time, self.stop_time) + + +class QualifiedObservationExpression(_PatternExpression): + def __init__(self, observation_expression, qualifier): + self.observation_expression = observation_expression + self.qualifier = qualifier + + def __str__(self): + return "%s %s" % (self.observation_expression, self.qualifier) diff --git a/stix2/test/test_pattern_expressions.py b/stix2/test/test_pattern_expressions.py index 717c185..e806aa6 100644 --- a/stix2/test/test_pattern_expressions.py +++ b/stix2/test/test_pattern_expressions.py @@ -18,7 +18,9 @@ def test_boolean_expression(): def test_boolean_expression_with_parentheses(): - exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", + exp1 = stix2.MatchesComparisonExpression(stix2.ObjectPath("email-message", + [stix2.ReferenceObjectPathComponent("from_ref"), + stix2.BasicObjectPathComponent("value")]), stix2.StringConstant(".+\\@example\\.com$")) exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", stix2.StringConstant("^Final Report.+\\.exe$")) @@ -29,11 +31,11 @@ def test_boolean_expression_with_parentheses(): def test_hash_followed_by_registryKey_expression_python_constant(): hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) - o_exp1 = stix2.ObservableExpression(hash_exp) + o_exp1 = stix2.ObservationExpression(hash_exp) reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) - o_exp2 = stix2.ObservableExpression(reg_exp) - fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) + o_exp2 = stix2.ObservationExpression(reg_exp) + fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2]) para_exp = stix2.ParentheticalExpression(fb_exp) qual_exp = stix2.WithinQualifier(300) exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) @@ -43,11 +45,11 @@ def test_hash_followed_by_registryKey_expression_python_constant(): def test_hash_followed_by_registryKey_expression(): hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) - o_exp1 = stix2.ObservableExpression(hash_exp) + o_exp1 = stix2.ObservationExpression(hash_exp) reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) - o_exp2 = stix2.ObservableExpression(reg_exp) - fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) + o_exp2 = stix2.ObservationExpression(reg_exp) + fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2]) para_exp = stix2.ParentheticalExpression(fb_exp) qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300)) exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) @@ -61,7 +63,7 @@ def test_file_observable_expression(): 'SHA-256')) exp2 = stix2.EqualityComparisonExpression("file:mime_type", stix2.StringConstant("application/x-pdf")) bool_exp = stix2.AndBooleanExpression([exp1, exp2]) - exp = stix2.ObservableExpression(bool_exp) + exp = stix2.ObservationExpression(bool_exp) assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa @@ -77,14 +79,14 @@ def test_multiple_file_observable_expression(): stix2.HashConstant( "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", 'SHA-256')) - op1_exp = stix2.ObservableExpression(bool1_exp) - op2_exp = stix2.ObservableExpression(exp3) - exp = stix2.AndObservableExpression([op1_exp, op2_exp]) + op1_exp = stix2.ObservationExpression(bool1_exp) + op2_exp = stix2.ObservationExpression(exp3) + exp = stix2.AndObservationExpression([op1_exp, op2_exp]) assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa def test_root_types(): - ast = stix2.ObservableExpression( + ast = stix2.ObservationExpression( stix2.AndBooleanExpression( [stix2.ParentheticalExpression( stix2.OrBooleanExpression([ @@ -100,21 +102,21 @@ def test_artifact_payload(): exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin", stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00")) and_exp = stix2.AndBooleanExpression([exp1, exp2]) - exp = stix2.ObservableExpression(and_exp) + exp = stix2.ObservationExpression(and_exp) assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa def test_greater_than_python_constant(): exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", 7.0) - exp = stix2.ObservableExpression(exp1) + exp = stix2.ObservationExpression(exp1) assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" def test_greater_than(): exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", stix2.FloatConstant(7.0)) - exp = stix2.ObservableExpression(exp1) + exp = stix2.ObservationExpression(exp1) assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" @@ -137,9 +139,9 @@ def test_and_observable_expression(): stix2.StringConstant("1009")), stix2.EqualityComparisonExpression("user-account:account_login", "Mary")]) - exp = stix2.AndObservableExpression([stix2.ObservableExpression(exp1), - stix2.ObservableExpression(exp2), - stix2.ObservableExpression(exp3)]) + exp = stix2.AndObservationExpression([stix2.ObservationExpression(exp1), + stix2.ObservationExpression(exp2), + stix2.ObservationExpression(exp3)]) assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']" # noqa @@ -148,7 +150,7 @@ def test_hex(): "image/bmp"), stix2.EqualityComparisonExpression("file:magic_number_hex", stix2.HexConstant("ffd8"))]) - exp = stix2.ObservableExpression(exp_and) + exp = stix2.ObservationExpression(exp_and) assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']" @@ -157,7 +159,7 @@ def test_multiple_qualifiers(): "domain-name"), stix2.EqualityComparisonExpression("network-traffic:dst_ref.value", "example.com")]) - exp_ob = stix2.ObservableExpression(exp_and) + exp_ob = stix2.ObservationExpression(exp_and) qual_rep = stix2.RepeatQualifier(5) qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800)) exp = stix2.QualifiedObservationExpression(stix2.QualifiedObservationExpression(exp_ob, qual_rep), qual_within) @@ -165,6 +167,6 @@ def test_multiple_qualifiers(): def test_set_op(): - exp = stix2.ObservableExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value", - "2001:0db8:dead:beef:0000:0000:0000:0000/64")) + exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value", + "2001:0db8:dead:beef:0000:0000:0000:0000/64")) assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']"