diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3dba8ef..201a248 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,13 +1,12 @@ - repo: https://github.com/pre-commit/pre-commit-hooks - sha: e626cd57090d8df0be21e4df0f4e55cc3511d6ab + sha: ea227f024bd89d638aea319c92806737e3375979 hooks: - id: trailing-whitespace - id: flake8 args: - --max-line-length=160 - id: check-merge-conflict - -- repo: https://github.com/FalconSocial/pre-commit-python-sorter - sha: 1.0.4 - hooks: - - id: python-import-sorter +- repo: https://github.com/FalconSocial/pre-commit-python-sorter + sha: b57843b0b874df1d16eb0bef00b868792cb245c2 + hooks: + - id: python-import-sorter diff --git a/stix2/__init__.py b/stix2/__init__.py index ff933db..98697a9 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -21,6 +21,23 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, ExternalReference, GranularMarking, KillChainPhase, MarkingDefinition, StatementMarking, TLPMarking) +from .patterns import (AndBooleanExpression, AndObservationExpression, + BasicObjectPathComponent, EqualityComparisonExpression, + FloatConstant, FollowedByObservationExpression, + GreaterThanComparisonExpression, + GreaterThanEqualComparisonExpression, HashConstant, + HexConstant, IntegerConstant, + IsSubsetComparisonExpression, + IsSupersetComparisonExpression, + LessThanComparisonExpression, + LessThanEqualComparisonExpression, + LikeComparisonExpression, ListConstant, + ListObjectPathComponent, MatchesComparisonExpression, + ObjectPath, ObservationExpression, OrBooleanExpression, + OrObservationExpression, ParentheticalExpression, + QualifiedObservationExpression, + ReferenceObjectPathComponent, RepeatQualifier, + StartStopQualifier, StringConstant, WithinQualifier) from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, Identity, Indicator, IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, Vulnerability) diff --git a/stix2/observables.py b/stix2/observables.py index e38e298..366e007 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -123,7 +123,7 @@ class DomainName(_Observable): class EmailAddress(_Observable): - _type = 'email-address' + _type = 'email-addr' _properties = { 'type': TypeProperty(_type), 'value': StringProperty(required=True), @@ -651,7 +651,7 @@ OBJ_MAP_OBSERVABLE = { 'autonomous-system': AutonomousSystem, 'directory': Directory, 'domain-name': DomainName, - 'email-address': EmailAddress, + 'email-addr': EmailAddress, 'email-message': EmailMessage, 'file': File, 'ipv4-addr': IPv4Address, diff --git a/stix2/patterns.py b/stix2/patterns.py new file mode 100644 index 0000000..d861144 --- /dev/null +++ b/stix2/patterns.py @@ -0,0 +1,419 @@ +import base64 +import binascii +import re + + +def escape_quotes_and_backslashes(s): + return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") + + +class _Constant(object): + pass + + +class StringConstant(_Constant): + def __init__(self, value): + self.value = value + + def __str__(self): + return "'%s'" % escape_quotes_and_backslashes(self.value) + + +class IntegerConstant(_Constant): + def __init__(self, value): + try: + self.value = int(value) + except Exception: + raise ValueError("must be an integer.") + + def __str__(self): + return "%s" % self.value + + +class FloatConstant(_Constant): + def __init__(self, value): + try: + self.value = float(value) + except Exception: + raise ValueError("must be an float.") + + def __str__(self): + return "%s" % self.value + + +class BooleanConstant(_Constant): + def __init__(self, value): + if isinstance(value, bool): + self.value = value + + trues = ['true', 't'] + falses = ['false', 'f'] + try: + if value.lower() in trues: + self.value = True + if value.lower() in falses: + self.value = False + except AttributeError: + if value == 1: + self.value = True + if value == 0: + self.value = False + + raise ValueError("must be a boolean value.") + + def __str__(self): + return "%s" % self.value + + +_HASH_REGEX = { + "MD5": ("^[a-fA-F0-9]{32}$", "MD5"), + "MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"), + "RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"), + "SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"), + "SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"), + "SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"), + "SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"), + "SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"), + "SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"), + "SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"), + "SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"), + "SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"), + "SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"), + "WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"), +} + + +class HashConstant(StringConstant): + def __init__(self, value, type): + key = type.upper().replace('-', '') + if key in _HASH_REGEX: + vocab_key = _HASH_REGEX[key][1] + if not re.match(_HASH_REGEX[key][0], value): + raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key)) + self.value = value + + +class BinaryConstant(_Constant): + + def __init__(self, value): + try: + base64.b64decode(value) + self.value = value + except (binascii.Error, TypeError): + raise ValueError("must contain a base64 encoded string") + + def __str__(self): + return "b'%s'" % self.value + + +class HexConstant(_Constant): + def __init__(self, value): + if not re.match('^([a-fA-F0-9]{2})+$', value): + raise ValueError("must contain an even number of hexadecimal characters") + self.value = value + + def __str__(self): + return "h'%s'" % self.value + + +class ListConstant(_Constant): + def __init__(self, values): + self.value = values + + def __str__(self): + return "(" + ", ".join([("%s" % x) for x in self.value]) + ")" + + +def make_constant(value): + if isinstance(value, str): + return StringConstant(value) + elif isinstance(value, int): + return IntegerConstant(value) + elif isinstance(value, float): + return FloatConstant(value) + elif isinstance(value, list): + return ListConstant(value) + elif isinstance(value, bool): + return BooleanConstant(value) + else: + raise ValueError("Unable to create a constant from %s" % value) + + +class _ObjectPathComponent(object): + @staticmethod + def create_ObjectPathComponent(component_name): + if component_name.endswith("_ref"): + return ReferenceObjectPathComponent(component_name) + elif component_name.find("[") != -1: + parse1 = component_name.split("[") + return ListObjectPathComponent(parse1[0], parse1[1][:-1]) + else: + return BasicObjectPathComponent(component_name) + + +class BasicObjectPathComponent(_ObjectPathComponent): + def __init__(self, property_name, is_key=False): + self.property_name = property_name + # TODO: set is_key to True if this component is a dictionary key + # self.is_key = is_key + + def __str__(self): + return self.property_name + + +class ListObjectPathComponent(_ObjectPathComponent): + def __init__(self, property_name, index): + self.property_name = property_name + self.index = index + + def __str__(self): + return "%s[%s]" % (self.property_name, self.index) + + +class ReferenceObjectPathComponent(_ObjectPathComponent): + def __init__(self, reference_property_name): + self.property_name = reference_property_name + + def __str__(self): + return self.property_name + + +class ObjectPath(object): + def __init__(self, object_type_name, property_path): + self.object_type_name = object_type_name + self.property_path = [x if isinstance(x, _ObjectPathComponent) else + _ObjectPathComponent.create_ObjectPathComponent(x) + for x in property_path] + + def __str__(self): + return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path])) + + def merge(self, other): + self.property_path.extend(other.property_path) + return self + + @staticmethod + def make_object_path(lhs): + path_as_parts = lhs.split(":") + return ObjectPath(path_as_parts[0], path_as_parts[1].split(".")) + + +class _PatternExpression(object): + + @staticmethod + def escape_quotes_and_backslashes(s): + return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") + + +class _ComparisonExpression(_PatternExpression): + def __init__(self, operator, lhs, rhs, negated=False): + if operator == "=" and isinstance(rhs, ListConstant): + self.operator = "IN" + else: + self.operator = operator + if isinstance(lhs, ObjectPath): + self.lhs = lhs + else: + self.lhs = ObjectPath.make_object_path(lhs) + if isinstance(rhs, _Constant): + self.rhs = rhs + else: + self.rhs = make_constant(rhs) + self.negated = negated + self.root_type = self.lhs.object_type_name + + def __str__(self): + # if isinstance(self.rhs, list): + # final_rhs = [] + # for r in self.rhs: + # final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'") + # rhs_string = "(" + ", ".join(final_rhs) + ")" + # else: + # rhs_string = self.rhs + if self.negated: + return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs) + else: + return "%s %s %s" % (self.lhs, self.operator, self.rhs) + + +class EqualityComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated) + + +class GreaterThanComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated) + + +class LessThanComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated) + + +class GreaterThanEqualComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(GreaterThanEqualComparisonExpression, self).__init__(">=", lhs, rhs, negated) + + +class LessThanEqualComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LessThanEqualComparisonExpression, self).__init__("<=", lhs, rhs, negated) + + +class InComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated) + + +class LikeComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated) + + +class MatchesComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated) + + +class IsSubsetComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated) + + +class IsSupersetComparisonExpression(_ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated) + + +class _BooleanExpression(_PatternExpression): + def __init__(self, operator, operands): + self.operator = operator + self.operands = [] + for arg in operands: + if not hasattr(self, "root_type"): + self.root_type = arg.root_type + elif self.root_type and (self.root_type != arg.root_type) and operator == "AND": + raise ValueError("All operands to an 'AND' expression must have the same object type") + elif self.root_type and (self.root_type != arg.root_type): + self.root_type = None + self.operands.append(arg) + + def __str__(self): + sub_exprs = [] + for o in self.operands: + sub_exprs.append("%s" % o) + return (" " + self.operator + " ").join(sub_exprs) + + +class AndBooleanExpression(_BooleanExpression): + def __init__(self, operands): + super(AndBooleanExpression, self).__init__("AND", operands) + + +class OrBooleanExpression(_BooleanExpression): + def __init__(self, operands): + super(OrBooleanExpression, self).__init__("OR", operands) + + +class ObservationExpression(_PatternExpression): + def __init__(self, operand): + self.operand = operand + + def __str__(self): + return "[%s]" % self.operand + + +class _CompoundObservationExpression(_PatternExpression): + def __init__(self, operator, operands): + self.operator = operator + self.operands = operands + + def __str__(self): + sub_exprs = [] + for o in self.operands: + sub_exprs.append("%s" % o) + return (" " + self.operator + " ").join(sub_exprs) + + +class AndObservationExpression(_CompoundObservationExpression): + def __init__(self, operands): + super(AndObservationExpression, self).__init__("AND", operands) + + +class OrObservationExpression(_CompoundObservationExpression): + def __init__(self, operands): + super(OrObservationExpression, self).__init__("OR", operands) + + +class FollowedByObservationExpression(_CompoundObservationExpression): + def __init__(self, operands): + super(FollowedByObservationExpression, self).__init__("FOLLOWEDBY", operands) + + +class ParentheticalExpression(_PatternExpression): + def __init__(self, exp): + self.expression = exp + if hasattr(exp, "root_type"): + self.root_type = exp.root_type + + def __str__(self): + return "(%s)" % self.expression + + +class _ExpressionQualifier(_PatternExpression): + pass + + +class RepeatQualifier(_ExpressionQualifier): + def __init__(self, times_to_repeat): + if isinstance(times_to_repeat, IntegerConstant): + self.times_to_repeat = times_to_repeat + elif isinstance(times_to_repeat, int): + self.times_to_repeat = IntegerConstant(times_to_repeat) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat) + + def __str__(self): + return "REPEATS %s TIMES" % self.times_to_repeat + + +class WithinQualifier(_ExpressionQualifier): + def __init__(self, number_of_seconds): + if isinstance(number_of_seconds, IntegerConstant): + self.number_of_seconds = number_of_seconds + elif isinstance(number_of_seconds, int): + self.number_of_seconds = IntegerConstant(number_of_seconds) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds) + + def __str__(self): + return "WITHIN %s SECONDS" % self.number_of_seconds + + +class StartStopQualifier(_ExpressionQualifier): + def __init__(self, start_time, stop_time): + if isinstance(start_time, IntegerConstant): + self.start_time = start_time + elif isinstance(start_time, int): + self.start_time = IntegerConstant(start_time) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time) + if isinstance(stop_time, IntegerConstant): + self.stop_time = stop_time + elif isinstance(stop_time, int): + self.stop_time = IntegerConstant(stop_time) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time) + + def __str__(self): + return "START %s STOP %s" % (self.start_time, self.stop_time) + + +class QualifiedObservationExpression(_PatternExpression): + def __init__(self, observation_expression, qualifier): + self.observation_expression = observation_expression + self.qualifier = qualifier + + def __str__(self): + return "%s %s" % (self.observation_expression, self.qualifier) diff --git a/stix2/test/test_observed_data.py b/stix2/test/test_observed_data.py index 75f3070..d5641e7 100644 --- a/stix2/test/test_observed_data.py +++ b/stix2/test/test_observed_data.py @@ -225,7 +225,7 @@ def test_parse_autonomous_system_valid(data): @pytest.mark.parametrize("data", [ """{ - "type": "email-address", + "type": "email-addr", "value": "john@example.com", "display_name": "John Doe", "belongs_to_ref": "0" @@ -233,7 +233,7 @@ def test_parse_autonomous_system_valid(data): ]) def test_parse_email_address(data): odata = stix2.parse_observable(data, {"0": "user-account"}) - assert odata.type == "email-address" + assert odata.type == "email-addr" odata_str = re.compile('"belongs_to_ref": "0"', re.DOTALL).sub('"belongs_to_ref": "3"', data) with pytest.raises(stix2.exceptions.InvalidObjRefError): diff --git a/stix2/test/test_pattern_expressions.py b/stix2/test/test_pattern_expressions.py new file mode 100644 index 0000000..e806aa6 --- /dev/null +++ b/stix2/test/test_pattern_expressions.py @@ -0,0 +1,172 @@ +import stix2 + + +def test_create_comparison_expression(): + + exp = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + stix2.HashConstant("aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", "SHA-256")) # noqa + assert str(exp) == "file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f'" + + +def test_boolean_expression(): + exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", + stix2.StringConstant(".+\\@example\\.com$")) + exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", + stix2.StringConstant("^Final Report.+\\.exe$")) + exp = stix2.AndBooleanExpression([exp1, exp2]) + assert str(exp) == "email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$'" # noqa + + +def test_boolean_expression_with_parentheses(): + exp1 = stix2.MatchesComparisonExpression(stix2.ObjectPath("email-message", + [stix2.ReferenceObjectPathComponent("from_ref"), + stix2.BasicObjectPathComponent("value")]), + stix2.StringConstant(".+\\@example\\.com$")) + exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", + stix2.StringConstant("^Final Report.+\\.exe$")) + exp = stix2.ParentheticalExpression(stix2.AndBooleanExpression([exp1, exp2])) + assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')" # noqa + + +def test_hash_followed_by_registryKey_expression_python_constant(): + hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", + stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) + o_exp1 = stix2.ObservationExpression(hash_exp) + reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), + stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) + o_exp2 = stix2.ObservationExpression(reg_exp) + fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2]) + para_exp = stix2.ParentheticalExpression(fb_exp) + qual_exp = stix2.WithinQualifier(300) + exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) + assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa + + +def test_hash_followed_by_registryKey_expression(): + hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", + stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) + o_exp1 = stix2.ObservationExpression(hash_exp) + reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), + stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) + o_exp2 = stix2.ObservationExpression(reg_exp) + fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2]) + para_exp = stix2.ParentheticalExpression(fb_exp) + qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300)) + exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) + assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa + + +def test_file_observable_expression(): + exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + stix2.HashConstant( + "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", + 'SHA-256')) + exp2 = stix2.EqualityComparisonExpression("file:mime_type", stix2.StringConstant("application/x-pdf")) + bool_exp = stix2.AndBooleanExpression([exp1, exp2]) + exp = stix2.ObservationExpression(bool_exp) + assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa + + +def test_multiple_file_observable_expression(): + exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + stix2.HashConstant( + "bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c", + 'SHA-256')) + exp2 = stix2.EqualityComparisonExpression("file:hashes.MD5", + stix2.HashConstant("cead3f77f6cda6ec00f57d76c9a6879f", "MD5")) + bool1_exp = stix2.OrBooleanExpression([exp1, exp2]) + exp3 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", + stix2.HashConstant( + "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", + 'SHA-256')) + op1_exp = stix2.ObservationExpression(bool1_exp) + op2_exp = stix2.ObservationExpression(exp3) + exp = stix2.AndObservationExpression([op1_exp, op2_exp]) + assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa + + +def test_root_types(): + ast = stix2.ObservationExpression( + stix2.AndBooleanExpression( + [stix2.ParentheticalExpression( + stix2.OrBooleanExpression([ + stix2.EqualityComparisonExpression("a:b", stix2.StringConstant("1")), + stix2.EqualityComparisonExpression("b:c", stix2.StringConstant("2"))])), + stix2.EqualityComparisonExpression(u"b:d", stix2.StringConstant("3"))])) + assert str(ast) == "[(a:b = '1' OR b:c = '2') AND b:d = '3']" + + +def test_artifact_payload(): + exp1 = stix2.EqualityComparisonExpression("artifact:mime_type", + "application/vnd.tcpdump.pcap") + exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin", + stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00")) + and_exp = stix2.AndBooleanExpression([exp1, exp2]) + exp = stix2.ObservationExpression(and_exp) + assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa + + +def test_greater_than_python_constant(): + exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", + 7.0) + exp = stix2.ObservationExpression(exp1) + assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" + + +def test_greater_than(): + exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", + stix2.FloatConstant(7.0)) + exp = stix2.ObservationExpression(exp1) + assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" + + +def test_and_observable_expression(): + exp1 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", + "unix"), + stix2.EqualityComparisonExpression("user-account:user_id", + stix2.StringConstant("1007")), + stix2.EqualityComparisonExpression("user-account:account_login", + "Peter")]) + exp2 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", + "unix"), + stix2.EqualityComparisonExpression("user-account:user_id", + stix2.StringConstant("1008")), + stix2.EqualityComparisonExpression("user-account:account_login", + "Paul")]) + exp3 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", + "unix"), + stix2.EqualityComparisonExpression("user-account:user_id", + stix2.StringConstant("1009")), + stix2.EqualityComparisonExpression("user-account:account_login", + "Mary")]) + exp = stix2.AndObservationExpression([stix2.ObservationExpression(exp1), + stix2.ObservationExpression(exp2), + stix2.ObservationExpression(exp3)]) + assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']" # noqa + + +def test_hex(): + exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("file:mime_type", + "image/bmp"), + stix2.EqualityComparisonExpression("file:magic_number_hex", + stix2.HexConstant("ffd8"))]) + exp = stix2.ObservationExpression(exp_and) + assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']" + + +def test_multiple_qualifiers(): + exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("network-traffic:dst_ref.type", + "domain-name"), + stix2.EqualityComparisonExpression("network-traffic:dst_ref.value", + "example.com")]) + exp_ob = stix2.ObservationExpression(exp_and) + qual_rep = stix2.RepeatQualifier(5) + qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800)) + exp = stix2.QualifiedObservationExpression(stix2.QualifiedObservationExpression(exp_ob, qual_rep), qual_within) + assert str(exp) == "[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS" # noqa + + +def test_set_op(): + exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value", + "2001:0db8:dead:beef:0000:0000:0000:0000/64")) + assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']"