commit
						bac87465cb
					
				|  | @ -1,13 +1,12 @@ | |||
| -   repo: https://github.com/pre-commit/pre-commit-hooks | ||||
|     sha: e626cd57090d8df0be21e4df0f4e55cc3511d6ab | ||||
|     sha: ea227f024bd89d638aea319c92806737e3375979 | ||||
|     hooks: | ||||
|     -   id: trailing-whitespace | ||||
|     -   id: flake8 | ||||
|         args: | ||||
|         - --max-line-length=160 | ||||
|     -   id: check-merge-conflict | ||||
| 
 | ||||
| - repo: https://github.com/FalconSocial/pre-commit-python-sorter | ||||
|   sha: 1.0.4 | ||||
|   hooks: | ||||
|   - id: python-import-sorter | ||||
| -   repo: https://github.com/FalconSocial/pre-commit-python-sorter | ||||
|     sha: b57843b0b874df1d16eb0bef00b868792cb245c2 | ||||
|     hooks: | ||||
|     -   id: python-import-sorter | ||||
|  |  | |||
|  | @ -21,6 +21,23 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, | |||
| from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, | ||||
|                     ExternalReference, GranularMarking, KillChainPhase, | ||||
|                     MarkingDefinition, StatementMarking, TLPMarking) | ||||
| from .patterns import (AndBooleanExpression, AndObservationExpression, | ||||
|                        BasicObjectPathComponent, EqualityComparisonExpression, | ||||
|                        FloatConstant, FollowedByObservationExpression, | ||||
|                        GreaterThanComparisonExpression, | ||||
|                        GreaterThanEqualComparisonExpression, HashConstant, | ||||
|                        HexConstant, IntegerConstant, | ||||
|                        IsSubsetComparisonExpression, | ||||
|                        IsSupersetComparisonExpression, | ||||
|                        LessThanComparisonExpression, | ||||
|                        LessThanEqualComparisonExpression, | ||||
|                        LikeComparisonExpression, ListConstant, | ||||
|                        ListObjectPathComponent, MatchesComparisonExpression, | ||||
|                        ObjectPath, ObservationExpression, OrBooleanExpression, | ||||
|                        OrObservationExpression, ParentheticalExpression, | ||||
|                        QualifiedObservationExpression, | ||||
|                        ReferenceObjectPathComponent, RepeatQualifier, | ||||
|                        StartStopQualifier, StringConstant, WithinQualifier) | ||||
| from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, | ||||
|                   Identity, Indicator, IntrusionSet, Malware, ObservedData, | ||||
|                   Report, ThreatActor, Tool, Vulnerability) | ||||
|  |  | |||
|  | @ -123,7 +123,7 @@ class DomainName(_Observable): | |||
| 
 | ||||
| 
 | ||||
| class EmailAddress(_Observable): | ||||
|     _type = 'email-address' | ||||
|     _type = 'email-addr' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'value': StringProperty(required=True), | ||||
|  | @ -651,7 +651,7 @@ OBJ_MAP_OBSERVABLE = { | |||
|     'autonomous-system': AutonomousSystem, | ||||
|     'directory': Directory, | ||||
|     'domain-name': DomainName, | ||||
|     'email-address': EmailAddress, | ||||
|     'email-addr': EmailAddress, | ||||
|     'email-message': EmailMessage, | ||||
|     'file': File, | ||||
|     'ipv4-addr': IPv4Address, | ||||
|  |  | |||
|  | @ -0,0 +1,419 @@ | |||
| import base64 | ||||
| import binascii | ||||
| import re | ||||
| 
 | ||||
| 
 | ||||
| def escape_quotes_and_backslashes(s): | ||||
|     return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") | ||||
| 
 | ||||
| 
 | ||||
| class _Constant(object): | ||||
|     pass | ||||
| 
 | ||||
| 
 | ||||
| class StringConstant(_Constant): | ||||
|     def __init__(self, value): | ||||
|         self.value = value | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "'%s'" % escape_quotes_and_backslashes(self.value) | ||||
| 
 | ||||
| 
 | ||||
| class IntegerConstant(_Constant): | ||||
|     def __init__(self, value): | ||||
|         try: | ||||
|             self.value = int(value) | ||||
|         except Exception: | ||||
|             raise ValueError("must be an integer.") | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "%s" % self.value | ||||
| 
 | ||||
| 
 | ||||
| class FloatConstant(_Constant): | ||||
|     def __init__(self, value): | ||||
|         try: | ||||
|             self.value = float(value) | ||||
|         except Exception: | ||||
|             raise ValueError("must be an float.") | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "%s" % self.value | ||||
| 
 | ||||
| 
 | ||||
| class BooleanConstant(_Constant): | ||||
|     def __init__(self, value): | ||||
|         if isinstance(value, bool): | ||||
|             self.value = value | ||||
| 
 | ||||
|         trues = ['true', 't'] | ||||
|         falses = ['false', 'f'] | ||||
|         try: | ||||
|             if value.lower() in trues: | ||||
|                 self.value = True | ||||
|             if value.lower() in falses: | ||||
|                 self.value = False | ||||
|         except AttributeError: | ||||
|             if value == 1: | ||||
|                 self.value = True | ||||
|             if value == 0: | ||||
|                 self.value = False | ||||
| 
 | ||||
|         raise ValueError("must be a boolean value.") | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "%s" % self.value | ||||
| 
 | ||||
| 
 | ||||
| _HASH_REGEX = { | ||||
|     "MD5": ("^[a-fA-F0-9]{32}$", "MD5"), | ||||
|     "MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"), | ||||
|     "RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"), | ||||
|     "SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"), | ||||
|     "SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"), | ||||
|     "SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"), | ||||
|     "SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"), | ||||
|     "SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"), | ||||
|     "SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"), | ||||
|     "SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"), | ||||
|     "SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"), | ||||
|     "SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"), | ||||
|     "SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"), | ||||
|     "WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"), | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| class HashConstant(StringConstant): | ||||
|     def __init__(self, value, type): | ||||
|         key = type.upper().replace('-', '') | ||||
|         if key in _HASH_REGEX: | ||||
|             vocab_key = _HASH_REGEX[key][1] | ||||
|             if not re.match(_HASH_REGEX[key][0], value): | ||||
|                 raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key)) | ||||
|             self.value = value | ||||
| 
 | ||||
| 
 | ||||
| class BinaryConstant(_Constant): | ||||
| 
 | ||||
|     def __init__(self, value): | ||||
|         try: | ||||
|             base64.b64decode(value) | ||||
|             self.value = value | ||||
|         except (binascii.Error, TypeError): | ||||
|             raise ValueError("must contain a base64 encoded string") | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "b'%s'" % self.value | ||||
| 
 | ||||
| 
 | ||||
| class HexConstant(_Constant): | ||||
|     def __init__(self, value): | ||||
|         if not re.match('^([a-fA-F0-9]{2})+$', value): | ||||
|             raise ValueError("must contain an even number of hexadecimal characters") | ||||
|         self.value = value | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "h'%s'" % self.value | ||||
| 
 | ||||
| 
 | ||||
| class ListConstant(_Constant): | ||||
|     def __init__(self, values): | ||||
|         self.value = values | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "(" + ", ".join([("%s" % x) for x in self.value]) + ")" | ||||
| 
 | ||||
| 
 | ||||
| def make_constant(value): | ||||
|     if isinstance(value, str): | ||||
|         return StringConstant(value) | ||||
|     elif isinstance(value, int): | ||||
|         return IntegerConstant(value) | ||||
|     elif isinstance(value, float): | ||||
|         return FloatConstant(value) | ||||
|     elif isinstance(value, list): | ||||
|         return ListConstant(value) | ||||
|     elif isinstance(value, bool): | ||||
|         return BooleanConstant(value) | ||||
|     else: | ||||
|         raise ValueError("Unable to create a constant from %s" % value) | ||||
| 
 | ||||
| 
 | ||||
| class _ObjectPathComponent(object): | ||||
|     @staticmethod | ||||
|     def create_ObjectPathComponent(component_name): | ||||
|         if component_name.endswith("_ref"): | ||||
|             return ReferenceObjectPathComponent(component_name) | ||||
|         elif component_name.find("[") != -1: | ||||
|             parse1 = component_name.split("[") | ||||
|             return ListObjectPathComponent(parse1[0], parse1[1][:-1]) | ||||
|         else: | ||||
|             return BasicObjectPathComponent(component_name) | ||||
| 
 | ||||
| 
 | ||||
| class BasicObjectPathComponent(_ObjectPathComponent): | ||||
|     def __init__(self, property_name, is_key=False): | ||||
|         self.property_name = property_name | ||||
|         # TODO: set is_key to True if this component is a dictionary key | ||||
|         # self.is_key = is_key | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return self.property_name | ||||
| 
 | ||||
| 
 | ||||
| class ListObjectPathComponent(_ObjectPathComponent): | ||||
|     def __init__(self, property_name, index): | ||||
|         self.property_name = property_name | ||||
|         self.index = index | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "%s[%s]" % (self.property_name, self.index) | ||||
| 
 | ||||
| 
 | ||||
| class ReferenceObjectPathComponent(_ObjectPathComponent): | ||||
|     def __init__(self, reference_property_name): | ||||
|         self.property_name = reference_property_name | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return self.property_name | ||||
| 
 | ||||
| 
 | ||||
| class ObjectPath(object): | ||||
|     def __init__(self, object_type_name, property_path): | ||||
|         self.object_type_name = object_type_name | ||||
|         self.property_path = [x if isinstance(x, _ObjectPathComponent) else | ||||
|                               _ObjectPathComponent.create_ObjectPathComponent(x) | ||||
|                               for x in property_path] | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path])) | ||||
| 
 | ||||
|     def merge(self, other): | ||||
|         self.property_path.extend(other.property_path) | ||||
|         return self | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def make_object_path(lhs): | ||||
|         path_as_parts = lhs.split(":") | ||||
|         return ObjectPath(path_as_parts[0], path_as_parts[1].split(".")) | ||||
| 
 | ||||
| 
 | ||||
| class _PatternExpression(object): | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def escape_quotes_and_backslashes(s): | ||||
|         return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'") | ||||
| 
 | ||||
| 
 | ||||
| class _ComparisonExpression(_PatternExpression): | ||||
|     def __init__(self, operator, lhs, rhs, negated=False): | ||||
|         if operator == "=" and isinstance(rhs, ListConstant): | ||||
|             self.operator = "IN" | ||||
|         else: | ||||
|             self.operator = operator | ||||
|         if isinstance(lhs, ObjectPath): | ||||
|             self.lhs = lhs | ||||
|         else: | ||||
|             self.lhs = ObjectPath.make_object_path(lhs) | ||||
|         if isinstance(rhs, _Constant): | ||||
|             self.rhs = rhs | ||||
|         else: | ||||
|             self.rhs = make_constant(rhs) | ||||
|         self.negated = negated | ||||
|         self.root_type = self.lhs.object_type_name | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         # if isinstance(self.rhs, list): | ||||
|         #     final_rhs = [] | ||||
|         #     for r in self.rhs: | ||||
|         #         final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'") | ||||
|         #     rhs_string = "(" + ", ".join(final_rhs) + ")" | ||||
|         # else: | ||||
|         #     rhs_string = self.rhs | ||||
|         if self.negated: | ||||
|             return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs) | ||||
|         else: | ||||
|             return "%s %s %s" % (self.lhs, self.operator, self.rhs) | ||||
| 
 | ||||
| 
 | ||||
| class EqualityComparisonExpression(_ComparisonExpression): | ||||
|     def __init__(self, lhs, rhs, negated=False): | ||||
|         super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated) | ||||
| 
 | ||||
| 
 | ||||
| class GreaterThanComparisonExpression(_ComparisonExpression): | ||||
|     def __init__(self, lhs, rhs, negated=False): | ||||
|         super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated) | ||||
| 
 | ||||
| 
 | ||||
| class LessThanComparisonExpression(_ComparisonExpression): | ||||
|     def __init__(self, lhs, rhs, negated=False): | ||||
|         super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated) | ||||
| 
 | ||||
| 
 | ||||
| class GreaterThanEqualComparisonExpression(_ComparisonExpression): | ||||
|     def __init__(self, lhs, rhs, negated=False): | ||||
|         super(GreaterThanEqualComparisonExpression, self).__init__(">=", lhs, rhs, negated) | ||||
| 
 | ||||
| 
 | ||||
| class LessThanEqualComparisonExpression(_ComparisonExpression): | ||||
|     def __init__(self, lhs, rhs, negated=False): | ||||
|         super(LessThanEqualComparisonExpression, self).__init__("<=", lhs, rhs, negated) | ||||
| 
 | ||||
| 
 | ||||
| class InComparisonExpression(_ComparisonExpression): | ||||
|     def __init__(self, lhs, rhs, negated=False): | ||||
|         super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated) | ||||
| 
 | ||||
| 
 | ||||
| class LikeComparisonExpression(_ComparisonExpression): | ||||
|     def __init__(self, lhs, rhs, negated=False): | ||||
|         super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated) | ||||
| 
 | ||||
| 
 | ||||
| class MatchesComparisonExpression(_ComparisonExpression): | ||||
|     def __init__(self, lhs, rhs, negated=False): | ||||
|         super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated) | ||||
| 
 | ||||
| 
 | ||||
| class IsSubsetComparisonExpression(_ComparisonExpression): | ||||
|         def __init__(self, lhs, rhs, negated=False): | ||||
|             super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated) | ||||
| 
 | ||||
| 
 | ||||
| class IsSupersetComparisonExpression(_ComparisonExpression): | ||||
|         def __init__(self, lhs, rhs, negated=False): | ||||
|             super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated) | ||||
| 
 | ||||
| 
 | ||||
| class _BooleanExpression(_PatternExpression): | ||||
|     def __init__(self, operator, operands): | ||||
|         self.operator = operator | ||||
|         self.operands = [] | ||||
|         for arg in operands: | ||||
|             if not hasattr(self, "root_type"): | ||||
|                 self.root_type = arg.root_type | ||||
|             elif self.root_type and (self.root_type != arg.root_type) and operator == "AND": | ||||
|                 raise ValueError("All operands to an 'AND' expression must have the same object type") | ||||
|             elif self.root_type and (self.root_type != arg.root_type): | ||||
|                 self.root_type = None | ||||
|             self.operands.append(arg) | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         sub_exprs = [] | ||||
|         for o in self.operands: | ||||
|             sub_exprs.append("%s" % o) | ||||
|         return (" " + self.operator + " ").join(sub_exprs) | ||||
| 
 | ||||
| 
 | ||||
| class AndBooleanExpression(_BooleanExpression): | ||||
|     def __init__(self, operands): | ||||
|         super(AndBooleanExpression, self).__init__("AND", operands) | ||||
| 
 | ||||
| 
 | ||||
| class OrBooleanExpression(_BooleanExpression): | ||||
|     def __init__(self, operands): | ||||
|         super(OrBooleanExpression, self).__init__("OR", operands) | ||||
| 
 | ||||
| 
 | ||||
| class ObservationExpression(_PatternExpression): | ||||
|     def __init__(self, operand): | ||||
|         self.operand = operand | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "[%s]" % self.operand | ||||
| 
 | ||||
| 
 | ||||
| class _CompoundObservationExpression(_PatternExpression): | ||||
|     def __init__(self, operator, operands): | ||||
|         self.operator = operator | ||||
|         self.operands = operands | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         sub_exprs = [] | ||||
|         for o in self.operands: | ||||
|             sub_exprs.append("%s" % o) | ||||
|         return (" " + self.operator + " ").join(sub_exprs) | ||||
| 
 | ||||
| 
 | ||||
| class AndObservationExpression(_CompoundObservationExpression): | ||||
|     def __init__(self, operands): | ||||
|         super(AndObservationExpression, self).__init__("AND", operands) | ||||
| 
 | ||||
| 
 | ||||
| class OrObservationExpression(_CompoundObservationExpression): | ||||
|     def __init__(self, operands): | ||||
|         super(OrObservationExpression, self).__init__("OR", operands) | ||||
| 
 | ||||
| 
 | ||||
| class FollowedByObservationExpression(_CompoundObservationExpression): | ||||
|     def __init__(self, operands): | ||||
|         super(FollowedByObservationExpression, self).__init__("FOLLOWEDBY", operands) | ||||
| 
 | ||||
| 
 | ||||
| class ParentheticalExpression(_PatternExpression): | ||||
|     def __init__(self, exp): | ||||
|         self.expression = exp | ||||
|         if hasattr(exp, "root_type"): | ||||
|             self.root_type = exp.root_type | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "(%s)" % self.expression | ||||
| 
 | ||||
| 
 | ||||
| class _ExpressionQualifier(_PatternExpression): | ||||
|     pass | ||||
| 
 | ||||
| 
 | ||||
| class RepeatQualifier(_ExpressionQualifier): | ||||
|     def __init__(self, times_to_repeat): | ||||
|         if isinstance(times_to_repeat, IntegerConstant): | ||||
|             self.times_to_repeat = times_to_repeat | ||||
|         elif isinstance(times_to_repeat, int): | ||||
|             self.times_to_repeat = IntegerConstant(times_to_repeat) | ||||
|         else: | ||||
|             raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat) | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "REPEATS %s TIMES" % self.times_to_repeat | ||||
| 
 | ||||
| 
 | ||||
| class WithinQualifier(_ExpressionQualifier): | ||||
|     def __init__(self, number_of_seconds): | ||||
|         if isinstance(number_of_seconds, IntegerConstant): | ||||
|             self.number_of_seconds = number_of_seconds | ||||
|         elif isinstance(number_of_seconds, int): | ||||
|             self.number_of_seconds = IntegerConstant(number_of_seconds) | ||||
|         else: | ||||
|             raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds) | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "WITHIN %s SECONDS" % self.number_of_seconds | ||||
| 
 | ||||
| 
 | ||||
| class StartStopQualifier(_ExpressionQualifier): | ||||
|     def __init__(self, start_time, stop_time): | ||||
|         if isinstance(start_time, IntegerConstant): | ||||
|             self.start_time = start_time | ||||
|         elif isinstance(start_time, int): | ||||
|             self.start_time = IntegerConstant(start_time) | ||||
|         else: | ||||
|             raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time) | ||||
|         if isinstance(stop_time, IntegerConstant): | ||||
|             self.stop_time = stop_time | ||||
|         elif isinstance(stop_time, int): | ||||
|             self.stop_time = IntegerConstant(stop_time) | ||||
|         else: | ||||
|             raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time) | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "START %s STOP %s" % (self.start_time, self.stop_time) | ||||
| 
 | ||||
| 
 | ||||
| class QualifiedObservationExpression(_PatternExpression): | ||||
|     def __init__(self, observation_expression, qualifier): | ||||
|         self.observation_expression = observation_expression | ||||
|         self.qualifier = qualifier | ||||
| 
 | ||||
|     def __str__(self): | ||||
|         return "%s %s" % (self.observation_expression, self.qualifier) | ||||
|  | @ -225,7 +225,7 @@ def test_parse_autonomous_system_valid(data): | |||
| 
 | ||||
| @pytest.mark.parametrize("data", [ | ||||
|     """{ | ||||
|         "type": "email-address", | ||||
|         "type": "email-addr", | ||||
|         "value": "john@example.com", | ||||
|         "display_name": "John Doe", | ||||
|         "belongs_to_ref": "0" | ||||
|  | @ -233,7 +233,7 @@ def test_parse_autonomous_system_valid(data): | |||
| ]) | ||||
| def test_parse_email_address(data): | ||||
|     odata = stix2.parse_observable(data, {"0": "user-account"}) | ||||
|     assert odata.type == "email-address" | ||||
|     assert odata.type == "email-addr" | ||||
| 
 | ||||
|     odata_str = re.compile('"belongs_to_ref": "0"', re.DOTALL).sub('"belongs_to_ref": "3"', data) | ||||
|     with pytest.raises(stix2.exceptions.InvalidObjRefError): | ||||
|  |  | |||
|  | @ -0,0 +1,172 @@ | |||
| import stix2 | ||||
| 
 | ||||
| 
 | ||||
| def test_create_comparison_expression(): | ||||
| 
 | ||||
|     exp = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", | ||||
|                                              stix2.HashConstant("aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", "SHA-256"))   # noqa | ||||
|     assert str(exp) == "file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f'" | ||||
| 
 | ||||
| 
 | ||||
| def test_boolean_expression(): | ||||
|     exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", | ||||
|                                              stix2.StringConstant(".+\\@example\\.com$")) | ||||
|     exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", | ||||
|                                              stix2.StringConstant("^Final Report.+\\.exe$")) | ||||
|     exp = stix2.AndBooleanExpression([exp1, exp2]) | ||||
|     assert str(exp) == "email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$'"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_boolean_expression_with_parentheses(): | ||||
|     exp1 = stix2.MatchesComparisonExpression(stix2.ObjectPath("email-message", | ||||
|                                                               [stix2.ReferenceObjectPathComponent("from_ref"), | ||||
|                                                                stix2.BasicObjectPathComponent("value")]), | ||||
|                                              stix2.StringConstant(".+\\@example\\.com$")) | ||||
|     exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", | ||||
|                                              stix2.StringConstant("^Final Report.+\\.exe$")) | ||||
|     exp = stix2.ParentheticalExpression(stix2.AndBooleanExpression([exp1, exp2])) | ||||
|     assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_hash_followed_by_registryKey_expression_python_constant(): | ||||
|     hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", | ||||
|                                                   stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) | ||||
|     o_exp1 = stix2.ObservationExpression(hash_exp) | ||||
|     reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), | ||||
|                                                  stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) | ||||
|     o_exp2 = stix2.ObservationExpression(reg_exp) | ||||
|     fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2]) | ||||
|     para_exp = stix2.ParentheticalExpression(fb_exp) | ||||
|     qual_exp = stix2.WithinQualifier(300) | ||||
|     exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) | ||||
|     assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_hash_followed_by_registryKey_expression(): | ||||
|     hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", | ||||
|                                                   stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) | ||||
|     o_exp1 = stix2.ObservationExpression(hash_exp) | ||||
|     reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), | ||||
|                                                  stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) | ||||
|     o_exp2 = stix2.ObservationExpression(reg_exp) | ||||
|     fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2]) | ||||
|     para_exp = stix2.ParentheticalExpression(fb_exp) | ||||
|     qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300)) | ||||
|     exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) | ||||
|     assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_file_observable_expression(): | ||||
|     exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", | ||||
|                                               stix2.HashConstant( | ||||
|                                                   "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", | ||||
|                                                   'SHA-256')) | ||||
|     exp2 = stix2.EqualityComparisonExpression("file:mime_type", stix2.StringConstant("application/x-pdf")) | ||||
|     bool_exp = stix2.AndBooleanExpression([exp1, exp2]) | ||||
|     exp = stix2.ObservationExpression(bool_exp) | ||||
|     assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_multiple_file_observable_expression(): | ||||
|     exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", | ||||
|                                               stix2.HashConstant( | ||||
|                                                   "bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c", | ||||
|                                                   'SHA-256')) | ||||
|     exp2 = stix2.EqualityComparisonExpression("file:hashes.MD5", | ||||
|                                               stix2.HashConstant("cead3f77f6cda6ec00f57d76c9a6879f", "MD5")) | ||||
|     bool1_exp = stix2.OrBooleanExpression([exp1, exp2]) | ||||
|     exp3 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", | ||||
|                                               stix2.HashConstant( | ||||
|                                                   "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", | ||||
|                                                   'SHA-256')) | ||||
|     op1_exp = stix2.ObservationExpression(bool1_exp) | ||||
|     op2_exp = stix2.ObservationExpression(exp3) | ||||
|     exp = stix2.AndObservationExpression([op1_exp, op2_exp]) | ||||
|     assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_root_types(): | ||||
|     ast = stix2.ObservationExpression( | ||||
|             stix2.AndBooleanExpression( | ||||
|                 [stix2.ParentheticalExpression( | ||||
|                     stix2.OrBooleanExpression([ | ||||
|                         stix2.EqualityComparisonExpression("a:b", stix2.StringConstant("1")), | ||||
|                         stix2.EqualityComparisonExpression("b:c", stix2.StringConstant("2"))])), | ||||
|                  stix2.EqualityComparisonExpression(u"b:d", stix2.StringConstant("3"))])) | ||||
|     assert str(ast) == "[(a:b = '1' OR b:c = '2') AND b:d = '3']" | ||||
| 
 | ||||
| 
 | ||||
| def test_artifact_payload(): | ||||
|     exp1 = stix2.EqualityComparisonExpression("artifact:mime_type", | ||||
|                                               "application/vnd.tcpdump.pcap") | ||||
|     exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin", | ||||
|                                              stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00")) | ||||
|     and_exp = stix2.AndBooleanExpression([exp1, exp2]) | ||||
|     exp = stix2.ObservationExpression(and_exp) | ||||
|     assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_greater_than_python_constant(): | ||||
|     exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", | ||||
|                                                  7.0) | ||||
|     exp = stix2.ObservationExpression(exp1) | ||||
|     assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" | ||||
| 
 | ||||
| 
 | ||||
| def test_greater_than(): | ||||
|     exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", | ||||
|                                                  stix2.FloatConstant(7.0)) | ||||
|     exp = stix2.ObservationExpression(exp1) | ||||
|     assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" | ||||
| 
 | ||||
| 
 | ||||
| def test_and_observable_expression(): | ||||
|     exp1 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", | ||||
|                                                                           "unix"), | ||||
|                                        stix2.EqualityComparisonExpression("user-account:user_id", | ||||
|                                                                           stix2.StringConstant("1007")), | ||||
|                                        stix2.EqualityComparisonExpression("user-account:account_login", | ||||
|                                                                           "Peter")]) | ||||
|     exp2 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", | ||||
|                                                                           "unix"), | ||||
|                                        stix2.EqualityComparisonExpression("user-account:user_id", | ||||
|                                                                           stix2.StringConstant("1008")), | ||||
|                                        stix2.EqualityComparisonExpression("user-account:account_login", | ||||
|                                                                           "Paul")]) | ||||
|     exp3 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type", | ||||
|                                                                           "unix"), | ||||
|                                        stix2.EqualityComparisonExpression("user-account:user_id", | ||||
|                                                                           stix2.StringConstant("1009")), | ||||
|                                        stix2.EqualityComparisonExpression("user-account:account_login", | ||||
|                                                                           "Mary")]) | ||||
|     exp = stix2.AndObservationExpression([stix2.ObservationExpression(exp1), | ||||
|                                          stix2.ObservationExpression(exp2), | ||||
|                                          stix2.ObservationExpression(exp3)]) | ||||
|     assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_hex(): | ||||
|     exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("file:mime_type", | ||||
|                                                                              "image/bmp"), | ||||
|                                           stix2.EqualityComparisonExpression("file:magic_number_hex", | ||||
|                                                                              stix2.HexConstant("ffd8"))]) | ||||
|     exp = stix2.ObservationExpression(exp_and) | ||||
|     assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']" | ||||
| 
 | ||||
| 
 | ||||
| def test_multiple_qualifiers(): | ||||
|     exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("network-traffic:dst_ref.type", | ||||
|                                                                              "domain-name"), | ||||
|                                           stix2.EqualityComparisonExpression("network-traffic:dst_ref.value", | ||||
|                                                                              "example.com")]) | ||||
|     exp_ob = stix2.ObservationExpression(exp_and) | ||||
|     qual_rep = stix2.RepeatQualifier(5) | ||||
|     qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800)) | ||||
|     exp = stix2.QualifiedObservationExpression(stix2.QualifiedObservationExpression(exp_ob, qual_rep), qual_within) | ||||
|     assert str(exp) == "[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS"  # noqa | ||||
| 
 | ||||
| 
 | ||||
| def test_set_op(): | ||||
|     exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value", | ||||
|                                                                          "2001:0db8:dead:beef:0000:0000:0000:0000/64")) | ||||
|     assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']" | ||||
		Loading…
	
		Reference in New Issue
	
	 Greg Back
						Greg Back