parent
							
								
									d453bf6f1a
								
							
						
					
					
						commit
						3abfe7868a
					
				|  | @ -31,11 +31,12 @@ from .environment import Environment, ObjectFactory | |||
| from .markings import (add_markings, clear_markings, get_markings, is_marked, | ||||
|                        remove_markings, set_markings) | ||||
| from .patterns import (AndBooleanExpression, AndObservationExpression, | ||||
|                        BasicObjectPathComponent, EqualityComparisonExpression, | ||||
|                        BasicObjectPathComponent, BinaryConstant, | ||||
|                        BooleanConstant, EqualityComparisonExpression, | ||||
|                        FloatConstant, FollowedByObservationExpression, | ||||
|                        GreaterThanComparisonExpression, | ||||
|                        GreaterThanEqualComparisonExpression, HashConstant, | ||||
|                        HexConstant, IntegerConstant, | ||||
|                        HexConstant, InComparisonExpression, IntegerConstant, | ||||
|                        IsSubsetComparisonExpression, | ||||
|                        IsSupersetComparisonExpression, | ||||
|                        LessThanComparisonExpression, | ||||
|  |  | |||
|  | @ -3,8 +3,11 @@ | |||
| 
 | ||||
| import base64 | ||||
| import binascii | ||||
| import datetime | ||||
| import re | ||||
| 
 | ||||
| from .utils import parse_into_datetime | ||||
| 
 | ||||
| 
 | ||||
| def escape_quotes_and_backslashes(s): | ||||
|     return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") | ||||
|  | @ -24,10 +27,13 @@ class StringConstant(_Constant): | |||
| 
 | ||||
| class TimestampConstant(_Constant): | ||||
|     def __init__(self, value): | ||||
|         self.value = value | ||||
|         try: | ||||
|             self.value = parse_into_datetime(value) | ||||
|         except Exception: | ||||
|             raise ValueError("must be a datetime object or timestamp string.") | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "t'%s'" % escape_quotes_and_backslashes(self.value) | ||||
|         return "t%s" % repr(self.value) | ||||
| 
 | ||||
| 
 | ||||
| class IntegerConstant(_Constant): | ||||
|  | @ -46,7 +52,7 @@ class FloatConstant(_Constant): | |||
|         try: | ||||
|             self.value = float(value) | ||||
|         except Exception: | ||||
|             raise ValueError("must be an float.") | ||||
|             raise ValueError("must be a float.") | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "%s" % self.value | ||||
|  | @ -56,24 +62,29 @@ class BooleanConstant(_Constant): | |||
|     def __init__(self, value): | ||||
|         if isinstance(value, bool): | ||||
|             self.value = value | ||||
|             return | ||||
| 
 | ||||
|         trues = ['true', 't'] | ||||
|         falses = ['false', 'f'] | ||||
|         try: | ||||
|             if value.lower() in trues: | ||||
|                 self.value = True | ||||
|             if value.lower() in falses: | ||||
|                 return | ||||
|             elif value.lower() in falses: | ||||
|                 self.value = False | ||||
|                 return | ||||
|         except AttributeError: | ||||
|             if value == 1: | ||||
|                 self.value = True | ||||
|             if value == 0: | ||||
|                 return | ||||
|             elif value == 0: | ||||
|                 self.value = False | ||||
|                 return | ||||
| 
 | ||||
|         raise ValueError("must be a boolean value.") | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "%s" % self.value | ||||
|         return str(self.value).lower() | ||||
| 
 | ||||
| 
 | ||||
| _HASH_REGEX = { | ||||
|  | @ -132,20 +143,25 @@ class ListConstant(_Constant): | |||
|         self.value = values | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "(" + ", ".join([("%s" % x) for x in self.value]) + ")" | ||||
|         return "(" + ", ".join([("%s" % make_constant(x)) for x in self.value]) + ")" | ||||
| 
 | ||||
| 
 | ||||
| def make_constant(value): | ||||
|     try: | ||||
|         return parse_into_datetime(value) | ||||
|     except ValueError: | ||||
|         pass | ||||
| 
 | ||||
|     if isinstance(value, str): | ||||
|         return StringConstant(value) | ||||
|     elif isinstance(value, bool): | ||||
|         return BooleanConstant(value) | ||||
|     elif isinstance(value, int): | ||||
|         return IntegerConstant(value) | ||||
|     elif isinstance(value, float): | ||||
|         return FloatConstant(value) | ||||
|     elif isinstance(value, list): | ||||
|         return ListConstant(value) | ||||
|     elif isinstance(value, bool): | ||||
|         return BooleanConstant(value) | ||||
|     else: | ||||
|         raise ValueError("Unable to create a constant from %s" % value) | ||||
| 
 | ||||
|  | @ -210,15 +226,12 @@ class ObjectPath(object): | |||
| 
 | ||||
| 
 | ||||
| class _PatternExpression(object): | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def escape_quotes_and_backslashes(s): | ||||
|         return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") | ||||
|     pass | ||||
| 
 | ||||
| 
 | ||||
| class _ComparisonExpression(_PatternExpression): | ||||
|     def __init__(self, operator, lhs, rhs, negated=False): | ||||
|         if operator == "=" and isinstance(rhs, ListConstant): | ||||
|         if operator == "=" and isinstance(rhs, (ListConstant, list)): | ||||
|             self.operator = "IN" | ||||
|         else: | ||||
|             self.operator = operator | ||||
|  | @ -234,13 +247,6 @@ class _ComparisonExpression(_PatternExpression): | |||
|         self.root_type = self.lhs.object_type_name | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         # if isinstance(self.rhs, list): | ||||
|         #     final_rhs = [] | ||||
|         #     for r in self.rhs: | ||||
|         #         final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'") | ||||
|         #     rhs_string = "(" + ", ".join(final_rhs) + ")" | ||||
|         # else: | ||||
|         #     rhs_string = self.rhs | ||||
|         if self.negated: | ||||
|             return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs) | ||||
|         else: | ||||
|  | @ -383,7 +389,7 @@ class RepeatQualifier(_ExpressionQualifier): | |||
|         elif isinstance(times_to_repeat, int): | ||||
|             self.times_to_repeat = IntegerConstant(times_to_repeat) | ||||
|         else: | ||||
|             raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat) | ||||
|             raise ValueError("%s is not a valid argument for a Repeat Qualifier" % times_to_repeat) | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "REPEATS %s TIMES" % self.times_to_repeat | ||||
|  | @ -404,18 +410,18 @@ class WithinQualifier(_ExpressionQualifier): | |||
| 
 | ||||
| class StartStopQualifier(_ExpressionQualifier): | ||||
|     def __init__(self, start_time, stop_time): | ||||
|         if isinstance(start_time, IntegerConstant): | ||||
|         if isinstance(start_time, TimestampConstant): | ||||
|             self.start_time = start_time | ||||
|         elif isinstance(start_time, int): | ||||
|             self.start_time = IntegerConstant(start_time) | ||||
|         elif isinstance(start_time, datetime.date): | ||||
|             self.start_time = TimestampConstant(start_time) | ||||
|         else: | ||||
|             raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time) | ||||
|         if isinstance(stop_time, IntegerConstant): | ||||
|             raise ValueError("%s is not a valid argument for a Start/Stop Qualifier" % start_time) | ||||
|         if isinstance(stop_time, TimestampConstant): | ||||
|             self.stop_time = stop_time | ||||
|         elif isinstance(stop_time, int): | ||||
|             self.stop_time = IntegerConstant(stop_time) | ||||
|         elif isinstance(stop_time, datetime.date): | ||||
|             self.stop_time = TimestampConstant(stop_time) | ||||
|         else: | ||||
|             raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time) | ||||
|             raise ValueError("%s is not a valid argument for a Start/Stop Qualifier" % stop_time) | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "START %s STOP %s" % (self.start_time, self.stop_time) | ||||
|  |  | |||
|  | @ -1,3 +1,7 @@ | |||
| import datetime | ||||
| 
 | ||||
| import pytest | ||||
| 
 | ||||
| import stix2 | ||||
| 
 | ||||
| 
 | ||||
|  | @ -67,7 +71,11 @@ def test_file_observable_expression(): | |||
|     assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_multiple_file_observable_expression(): | ||||
| @pytest.mark.parametrize("observation_class, op", [ | ||||
|     (stix2.AndObservationExpression, 'AND'), | ||||
|     (stix2.OrObservationExpression, 'OR'), | ||||
| ]) | ||||
| def test_multiple_file_observable_expression(observation_class, op): | ||||
|     exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", | ||||
|                                               stix2.HashConstant( | ||||
|                                                   "bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c", | ||||
|  | @ -81,8 +89,8 @@ def test_multiple_file_observable_expression(): | |||
|                                                   'SHA-256')) | ||||
|     op1_exp = stix2.ObservationExpression(bool1_exp) | ||||
|     op2_exp = stix2.ObservationExpression(exp3) | ||||
|     exp = stix2.AndObservationExpression([op1_exp, op2_exp]) | ||||
|     assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']"  # noqa | ||||
|     exp = observation_class([op1_exp, op2_exp]) | ||||
|     assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] {} [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']".format(op)  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_root_types(): | ||||
|  | @ -120,6 +128,31 @@ def test_greater_than(): | |||
|     assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" | ||||
| 
 | ||||
| 
 | ||||
| def test_less_than(): | ||||
|     exp = stix2.LessThanComparisonExpression("file:size", | ||||
|                                              1024) | ||||
|     assert str(exp) == "file:size < 1024" | ||||
| 
 | ||||
| 
 | ||||
| def test_greater_than_or_equal(): | ||||
|     exp = stix2.GreaterThanEqualComparisonExpression("file:size", | ||||
|                                                      1024) | ||||
|     assert str(exp) == "file:size >= 1024" | ||||
| 
 | ||||
| 
 | ||||
| def test_less_than_or_equal(): | ||||
|     exp = stix2.LessThanEqualComparisonExpression("file:size", | ||||
|                                                   1024) | ||||
|     assert str(exp) == "file:size <= 1024" | ||||
| 
 | ||||
| 
 | ||||
| def test_not(): | ||||
|     exp = stix2.LessThanComparisonExpression("file:size", | ||||
|                                              1024, | ||||
|                                              negated=True) | ||||
|     assert str(exp) == "file:size NOT < 1024" | ||||
| 
 | ||||
| 
 | ||||
| def test_and_observable_expression(): | ||||
|     exp1 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", | ||||
|                                                                           "unix"), | ||||
|  | @ -145,6 +178,15 @@ def test_and_observable_expression(): | |||
|     assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_and_observable_expression(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:display_name", | ||||
|                                                                        "admin"), | ||||
|                                     stix2.EqualityComparisonExpression("email-addr:display_name", | ||||
|                                                                        stix2.StringConstant("admin"))]) | ||||
|     assert "All operands to an 'AND' expression must have the same object type" in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| def test_hex(): | ||||
|     exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("file:mime_type", | ||||
|                                                                              "image/bmp"), | ||||
|  | @ -175,3 +217,158 @@ def test_set_op(): | |||
| def test_timestamp(): | ||||
|     ts = stix2.TimestampConstant('2014-01-13T07:03:17Z') | ||||
|     assert str(ts) == "t'2014-01-13T07:03:17Z'" | ||||
| 
 | ||||
| 
 | ||||
| def test_boolean(): | ||||
|     exp = stix2.EqualityComparisonExpression("email-message:is_multipart", | ||||
|                                              True) | ||||
|     assert str(exp) == "email-message:is_multipart = true" | ||||
| 
 | ||||
| 
 | ||||
| def test_binary(): | ||||
|     const = stix2.BinaryConstant("dGhpcyBpcyBhIHRlc3Q=") | ||||
|     exp = stix2.EqualityComparisonExpression("artifact:payload_bin", | ||||
|                                              const) | ||||
|     assert str(exp) == "artifact:payload_bin = b'dGhpcyBpcyBhIHRlc3Q='" | ||||
| 
 | ||||
| 
 | ||||
| def test_list(): | ||||
|     exp = stix2.InComparisonExpression("process:name", | ||||
|                                        ['proccy', 'proximus', 'badproc']) | ||||
|     assert str(exp) == "process:name IN ('proccy', 'proximus', 'badproc')" | ||||
| 
 | ||||
| 
 | ||||
| def test_list2(): | ||||
|     # alternate way to construct an "IN" Comparison Expression | ||||
|     exp = stix2.EqualityComparisonExpression("process:name", | ||||
|                                              ['proccy', 'proximus', 'badproc']) | ||||
|     assert str(exp) == "process:name IN ('proccy', 'proximus', 'badproc')" | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_constant_type(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.EqualityComparisonExpression("artifact:payload_bin", | ||||
|                                            {'foo': 'bar'}) | ||||
|     assert 'Unable to create a constant' in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_integer_constant(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.IntegerConstant('foo') | ||||
|     assert 'must be an integer' in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_timestamp_constant(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.TimestampConstant('foo') | ||||
|     assert 'must be a datetime object or timestamp string' in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_float_constant(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.FloatConstant('foo') | ||||
|     assert 'must be a float' in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize("data, result", [ | ||||
|     (True, True), | ||||
|     (False, False), | ||||
|     ('True', True), | ||||
|     ('False', False), | ||||
|     ('true', True), | ||||
|     ('false', False), | ||||
|     ('t', True), | ||||
|     ('f', False), | ||||
|     ('T', True), | ||||
|     ('F', False), | ||||
|     (1, True), | ||||
|     (0, False), | ||||
| ]) | ||||
| def test_boolean_constant(data, result): | ||||
|     boolean = stix2.BooleanConstant(data) | ||||
|     assert boolean.value == result | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_boolean_constant(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.BooleanConstant('foo') | ||||
|     assert 'must be a boolean' in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize("hashtype, data", [ | ||||
|     ('MD5', 'zzz'), | ||||
|     ('ssdeep', 'zzz=='), | ||||
| ]) | ||||
| def test_invalid_hash_constant(hashtype, data): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.HashConstant(data, hashtype) | ||||
|     assert 'is not a valid {} hash'.format(hashtype) in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_hex_constant(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.HexConstant('mm') | ||||
|     assert "must contain an even number of hexadecimal characters" in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_binary_constant(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.BinaryConstant('foo') | ||||
|     assert 'must contain a base64' in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| def test_escape_quotes_and_backslashes(): | ||||
|     exp = stix2.MatchesComparisonExpression("file:name", | ||||
|                                             "^Final Report.+\.exe$") | ||||
|     assert str(exp) == "file:name MATCHES '^Final Report.+\\\\.exe$'" | ||||
| 
 | ||||
| 
 | ||||
| def test_like(): | ||||
|     exp = stix2.LikeComparisonExpression("directory:path", | ||||
|                                          "C:\Windows\%\\foo") | ||||
|     assert str(exp) == "directory:path LIKE 'C:\\\\Windows\\\\%\\\\foo'" | ||||
| 
 | ||||
| 
 | ||||
| def test_issuperset(): | ||||
|     exp = stix2.IsSupersetComparisonExpression("ipv4-addr:value", | ||||
|                                                "198.51.100.0/24") | ||||
|     assert str(exp) == "ipv4-addr:value ISSUPERSET '198.51.100.0/24'" | ||||
| 
 | ||||
| 
 | ||||
| def test_repeat_qualifier(): | ||||
|     qual = stix2.RepeatQualifier(stix2.IntegerConstant(5)) | ||||
|     assert str(qual) == 'REPEATS 5 TIMES' | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_repeat_qualifier(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.RepeatQualifier('foo') | ||||
|     assert 'is not a valid argument for a Repeat Qualifier' in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_within_qualifier(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.WithinQualifier('foo') | ||||
|     assert 'is not a valid argument for a Within Qualifier' in str(excinfo) | ||||
| 
 | ||||
| 
 | ||||
| def test_startstop_qualifier(): | ||||
|     qual = stix2.StartStopQualifier(stix2.TimestampConstant('2016-06-01T00:00:00Z'), | ||||
|                                     datetime.datetime(2017, 3, 12, 8, 30, 0)) | ||||
|     assert str(qual) == "START t'2016-06-01T00:00:00Z' STOP t'2017-03-12T08:30:00Z'" | ||||
| 
 | ||||
|     qual2 = stix2.StartStopQualifier(datetime.date(2016, 6, 1), | ||||
|                                      stix2.TimestampConstant('2016-07-01T00:00:00Z')) | ||||
|     assert str(qual2) == "START t'2016-06-01T00:00:00Z' STOP t'2016-07-01T00:00:00Z'" | ||||
| 
 | ||||
| 
 | ||||
| def test_invalid_startstop_qualifier(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.StartStopQualifier('foo', | ||||
|                                  stix2.TimestampConstant('2016-06-01T00:00:00Z')) | ||||
|     assert 'is not a valid argument for a Start/Stop Qualifier' in str(excinfo) | ||||
| 
 | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         stix2.StartStopQualifier(datetime.date(2016, 6, 1), | ||||
|                                  'foo') | ||||
|     assert 'is not a valid argument for a Start/Stop Qualifier' in str(excinfo) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Chris Lenk
						Chris Lenk