From 6fa009e5094393be296ff1cfeecfa907e381ef0d Mon Sep 17 00:00:00 2001 From: Richard Piazza Date: Wed, 12 Jul 2017 17:02:51 -0400 Subject: [PATCH] added object_paths added more tests for pattern expressions added "set" comparison expressions implemented make_constant fixed type name for EmailAddress --- stix2/__init__.py | 7 ++- stix2/constants.py | 14 ++++- stix2/object_path.py | 59 +++++++++++++++++++ stix2/observables.py | 4 +- stix2/pattern_expressions.py | 56 +++++++++++++----- stix2/test/test_observed_data.py | 4 +- stix2/test/test_pattern_expressions.py | 79 +++++++++++++++++++++++++- 7 files changed, 197 insertions(+), 26 deletions(-) create mode 100644 stix2/object_path.py diff --git a/stix2/__init__.py b/stix2/__init__.py index a1eaa56..637672a 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -4,8 +4,9 @@ from . import exceptions from .bundle import Bundle -from .constants import (FloatConstant, HashConstant, IntegerConstant, - StringConstant) +from .constants import (FloatConstant, HashConstant, HexConstant, + IntegerConstant, StringConstant) +from .object_path import ObjectPath, ObjectPathComponent from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, AutonomousSystem, CustomObservable, Directory, DomainName, EmailAddress, EmailMessage, @@ -29,6 +30,8 @@ from .pattern_expressions import (AndBooleanExpression, FollowedByObservableExpression, GreaterThanComparisonExpression, GreaterThanEqualComparisonExpression, + IsSubsetComparisonExpression, + IsSupersetComparisonExpression, LessThanComparisonExpression, LessThanEqualComparisonExpression, LikeComparisonExpression, diff --git a/stix2/constants.py b/stix2/constants.py index fb84c19..142e9d7 100644 --- a/stix2/constants.py +++ b/stix2/constants.py @@ -121,5 +121,15 @@ class ListConstant(Constant): def make_constant(value): - # TODO: Stub - pass + if isinstance(value, str): + return StringConstant(value) + elif isinstance(value, int): + return IntegerConstant(value) + elif isinstance(value, float): + return FloatConstant(value) + elif isinstance(value, list): + return ListConstant(value) + elif isinstance(value, bool): + return BooleanConstant(value) + else: + raise ValueError("Unable to create a constant from %s" % value) diff --git a/stix2/object_path.py b/stix2/object_path.py new file mode 100644 index 0000000..22ebef7 --- /dev/null +++ b/stix2/object_path.py @@ -0,0 +1,59 @@ + +class ObjectPathComponent(object): + pass + + +class BasicObjectPathComponent(ObjectPathComponent): + def __init__(self, property_name, is_key=False): + self.property_name = property_name + # TODO: set is_key to True if this component is a dictionary key + # self.is_key = is_key + + def __str__(self): + return self.property_name + + +class ListObjectPathComponent(ObjectPathComponent): + def __init__(self, property_name, index): + self.property_name = property_name + self.index = index + + def __str__(self): + return "%s[%s]" % (self.property_name, self.index) + + +class ReferenceObjectPathComponent(ObjectPathComponent): + def __init__(self, reference_property_name): + self.property_name = reference_property_name + + def __str__(self): + return self.property_name + + +class ObjectPath(object): + def __init__(self, object_type_name, property_path): + self.object_type_name = object_type_name + self.property_path = [x if isinstance(x, ObjectPathComponent) else ObjectPath.create_ObjectPathComponent(x) + for x in property_path] + + def __str__(self): + return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path])) + + def merge(self, other): + self.property_path.extend(other.property_path) + return self + + @staticmethod + def make_object_path(lhs): + path_as_parts = lhs.split(":") + return ObjectPath(path_as_parts[0], path_as_parts[1].split(".")) + + @staticmethod + def create_ObjectPathComponent(component_name): + if component_name.endswith("_ref"): + return ReferenceObjectPathComponent(component_name) + elif component_name.find("[") != -1: + parse1 = component_name.split("[") + return ListObjectPathComponent(parse1[0], parse1[1][:-1]) + else: + return BasicObjectPathComponent(component_name) diff --git a/stix2/observables.py b/stix2/observables.py index e38e298..366e007 100644 --- a/stix2/observables.py +++ b/stix2/observables.py @@ -123,7 +123,7 @@ class DomainName(_Observable): class EmailAddress(_Observable): - _type = 'email-address' + _type = 'email-addr' _properties = { 'type': TypeProperty(_type), 'value': StringProperty(required=True), @@ -651,7 +651,7 @@ OBJ_MAP_OBSERVABLE = { 'autonomous-system': AutonomousSystem, 'directory': Directory, 'domain-name': DomainName, - 'email-address': EmailAddress, + 'email-addr': EmailAddress, 'email-message': EmailMessage, 'file': File, 'ipv4-addr': IPv4Address, diff --git a/stix2/pattern_expressions.py b/stix2/pattern_expressions.py index 12aadc2..6804389 100644 --- a/stix2/pattern_expressions.py +++ b/stix2/pattern_expressions.py @@ -1,13 +1,9 @@ -from .constants import ListConstant, make_constant +from .constants import Constant, IntegerConstant, ListConstant, make_constant +from .object_path import ObjectPath class PatternExpression(object): - @staticmethod - def get_root_from_object_path(lhs): - path_as_parts = lhs.split(":") - return path_as_parts[0] - @staticmethod def escape_quotes_and_backslashes(s): return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") @@ -19,13 +15,16 @@ class ComparisonExpression(PatternExpression): self.operator = "IN" else: self.operator = operator - self.lhs = lhs - if isinstance(rhs, str): - self.rhs = make_constant(rhs) + if isinstance(lhs, ObjectPath): + self.lhs = lhs else: + self.lhs = ObjectPath.make_object_path(lhs) + if isinstance(rhs, Constant): self.rhs = rhs + else: + self.rhs = make_constant(rhs) self.negated = negated - self.root_type = self.get_root_from_object_path(lhs) + self.root_type = self.lhs.object_type_name def __str__(self): # if isinstance(self.rhs, list): @@ -81,7 +80,14 @@ class MatchesComparisonExpression(ComparisonExpression): super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated) -# TODO: ISASUBSET, ISSUPERSET +class IsSubsetComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated) + + +class IsSupersetComparisonExpression(ComparisonExpression): + def __init__(self, lhs, rhs, negated=False): + super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated) class BooleanExpression(PatternExpression): @@ -165,7 +171,12 @@ class ExpressionQualifier(PatternExpression): class RepeatQualifier(ExpressionQualifier): def __init__(self, times_to_repeat): - self.times_to_repeat = times_to_repeat + if isinstance(times_to_repeat, IntegerConstant): + self.times_to_repeat = times_to_repeat + elif isinstance(times_to_repeat, int): + self.times_to_repeat = IntegerConstant(times_to_repeat) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat) def __str__(self): return "REPEATS %s TIMES" % self.times_to_repeat @@ -173,7 +184,12 @@ class RepeatQualifier(ExpressionQualifier): class WithinQualifier(ExpressionQualifier): def __init__(self, number_of_seconds): - self.number_of_seconds = number_of_seconds + if isinstance(number_of_seconds, IntegerConstant): + self.number_of_seconds = number_of_seconds + elif isinstance(number_of_seconds, int): + self.number_of_seconds = IntegerConstant(number_of_seconds) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds) def __str__(self): return "WITHIN %s SECONDS" % self.number_of_seconds @@ -181,8 +197,18 @@ class WithinQualifier(ExpressionQualifier): class StartStopQualifier(ExpressionQualifier): def __init__(self, start_time, stop_time): - self.start_time = start_time - self.stop_time = stop_time + if isinstance(start_time, IntegerConstant): + self.start_time = start_time + elif isinstance(start_time, int): + self.start_time = IntegerConstant(start_time) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time) + if isinstance(stop_time, IntegerConstant): + self.stop_time = stop_time + elif isinstance(stop_time, int): + self.stop_time = IntegerConstant(stop_time) + else: + raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time) def __str__(self): return "START %s STOP %s" % (self.start_time, self.stop_time) diff --git a/stix2/test/test_observed_data.py b/stix2/test/test_observed_data.py index 75f3070..d5641e7 100644 --- a/stix2/test/test_observed_data.py +++ b/stix2/test/test_observed_data.py @@ -225,7 +225,7 @@ def test_parse_autonomous_system_valid(data): @pytest.mark.parametrize("data", [ """{ - "type": "email-address", + "type": "email-addr", "value": "john@example.com", "display_name": "John Doe", "belongs_to_ref": "0" @@ -233,7 +233,7 @@ def test_parse_autonomous_system_valid(data): ]) def test_parse_email_address(data): odata = stix2.parse_observable(data, {"0": "user-account"}) - assert odata.type == "email-address" + assert odata.type == "email-addr" odata_str = re.compile('"belongs_to_ref": "0"', re.DOTALL).sub('"belongs_to_ref": "3"', data) with pytest.raises(stix2.exceptions.InvalidObjRefError): diff --git a/stix2/test/test_pattern_expressions.py b/stix2/test/test_pattern_expressions.py index fdfb1ee..2e8495b 100644 --- a/stix2/test/test_pattern_expressions.py +++ b/stix2/test/test_pattern_expressions.py @@ -26,18 +26,32 @@ def test_boolean_expression_with_parentheses(): assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')" # noqa +def test_hash_followed_by_registryKey_expression_python_constant(): + hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", + stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) + o_exp1 = stix2.ObservableExpression(hash_exp) + reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), + stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) + o_exp2 = stix2.ObservableExpression(reg_exp) + fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) + para_exp = stix2.ParentheticalExpression(fb_exp) + qual_exp = stix2.WithinQualifier(300) + exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) + assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa + + def test_hash_followed_by_registryKey_expression(): hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) o_exp1 = stix2.ObservableExpression(hash_exp) - reg_exp = stix2.EqualityComparisonExpression("win-registry-key:key", + reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) o_exp2 = stix2.ObservableExpression(reg_exp) fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) para_exp = stix2.ParentheticalExpression(fb_exp) qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300)) exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) - assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [win-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa + assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa def test_file_observable_expression(): @@ -82,7 +96,7 @@ def test_root_types(): def test_artifact_payload(): exp1 = stix2.EqualityComparisonExpression("artifact:mime_type", - stix2.StringConstant("application/vnd.tcpdump.pcap")) + "application/vnd.tcpdump.pcap") exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin", stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00")) and_exp = stix2.AndBooleanExpression([exp1, exp2]) @@ -90,8 +104,67 @@ def test_artifact_payload(): assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa +def test_greater_than_python_constant(): + exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", + 7.0) + exp = stix2.ObservableExpression(exp1) + assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" + + def test_greater_than(): exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", stix2.FloatConstant(7.0)) exp = stix2.ObservableExpression(exp1) assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" + + +def test_and_observable_expression(): + exp1 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", + "unix"), + stix2.EqualityComparisonExpression("user-account:user_id", + stix2.StringConstant("1007")), + stix2.EqualityComparisonExpression("user-account:account_login", + "Peter")]) + exp2 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", + "unix"), + stix2.EqualityComparisonExpression("user-account:user_id", + stix2.StringConstant("1008")), + stix2.EqualityComparisonExpression("user-account:account_login", + "Paul")]) + exp3 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", + "unix"), + stix2.EqualityComparisonExpression("user-account:user_id", + stix2.StringConstant("1009")), + stix2.EqualityComparisonExpression("user-account:account_login", + "Mary")]) + exp = stix2.AndObservableExpression([stix2.ObservableExpression(exp1), + stix2.ObservableExpression(exp2), + stix2.ObservableExpression(exp3)]) + assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']" # noqa + + +def test_hex(): + exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("file:mime_type", + "image/bmp"), + stix2.EqualityComparisonExpression("file:magic_number_hex", + stix2.HexConstant("ffd8"))]) + exp = stix2.ObservableExpression(exp_and) + assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']" + + +def test_multiple_qualifiers(): + exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("network-traffic:dst_ref.type", + "domain-name"), + stix2.EqualityComparisonExpression("network-traffic:dst_ref.value", + "example.com")]) + exp_ob = stix2.ObservableExpression(exp_and) + qual_rep = stix2.RepeatQualifier(5) + qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800)) + exp = stix2.QualifiedObservationExpression(stix2.QualifiedObservationExpression(exp_ob, qual_rep), qual_within) + assert str(exp) == "[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS" # noqa + + +def test_set_op(): + exp = stix2.ObservableExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value", + "2001:0db8:dead:beef:0000:0000:0000:0000/64")) + assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']"