merge all classes into patterns.py

stix2.1
Richard Piazza 2017-07-19 09:39:17 -04:00 committed by Greg Back
parent 979c09d8c0
commit a2aacc5e20
6 changed files with 460 additions and 461 deletions

View File

@ -4,9 +4,6 @@
from . import exceptions from . import exceptions
from .bundle import Bundle from .bundle import Bundle
from .constants import (FloatConstant, HashConstant, HexConstant,
IntegerConstant, StringConstant)
from .object_path import ObjectPath, ObjectPathComponent
from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
AutonomousSystem, CustomObservable, Directory, AutonomousSystem, CustomObservable, Directory,
DomainName, EmailAddress, EmailMessage, DomainName, EmailAddress, EmailMessage,
@ -23,25 +20,23 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE, from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE,
ExternalReference, GranularMarking, KillChainPhase, ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking) MarkingDefinition, StatementMarking, TLPMarking)
from .pattern_expressions import (AndBooleanExpression, from .patterns import (AndBooleanExpression, AndObservationExpression,
AndObservableExpression, BasicObjectPathComponent, EqualityComparisonExpression,
ComparisonExpression, FloatConstant, FollowedByObservationExpression,
EqualityComparisonExpression, GreaterThanComparisonExpression,
FollowedByObservableExpression, GreaterThanEqualComparisonExpression, HashConstant,
GreaterThanComparisonExpression, HexConstant, IntegerConstant,
GreaterThanEqualComparisonExpression, IsSubsetComparisonExpression,
IsSubsetComparisonExpression, IsSupersetComparisonExpression,
IsSupersetComparisonExpression, LessThanComparisonExpression,
LessThanComparisonExpression, LessThanEqualComparisonExpression,
LessThanEqualComparisonExpression, LikeComparisonExpression, ListConstant,
LikeComparisonExpression, ListObjectPathComponent, MatchesComparisonExpression,
MatchesComparisonExpression, ObjectPath, ObservationExpression, OrBooleanExpression,
ObservableExpression, OrBooleanExpression, OrObservationExpression, ParentheticalExpression,
OrObservableExpression, QualifiedObservationExpression,
ParentheticalExpression, ReferenceObjectPathComponent, RepeatQualifier,
QualifiedObservationExpression, StartStopQualifier, StringConstant, WithinQualifier)
RepeatQualifier, StartStopQualifier,
WithinQualifier)
from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject, from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject,
Identity, Indicator, IntrusionSet, Malware, ObservedData, Identity, Indicator, IntrusionSet, Malware, ObservedData,
Report, ThreatActor, Tool, Vulnerability) Report, ThreatActor, Tool, Vulnerability)

View File

@ -1,135 +0,0 @@
import base64
import binascii
import re
# TODO: REConstant?
# TODO: Timestamp
class Constant(object):
def __str__(self):
return "%s" % self.value
@staticmethod
def escape_quotes_and_backslashes(s):
return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'")
class StringConstant(Constant):
def __init__(self, value):
self.value = value
def __str__(self):
return "'%s'" % StringConstant.escape_quotes_and_backslashes(self.value)
class IntegerConstant(Constant):
def __init__(self, value):
try:
self.value = int(value)
except Exception:
raise ValueError("must be an integer.")
class FloatConstant(Constant):
def __init__(self, value):
try:
self.value = float(value)
except Exception:
raise ValueError("must be an float.")
class BooleanConstant(Constant):
def __init__(self, value):
if isinstance(value, bool):
self.value = value
trues = ['true', 't']
falses = ['false', 'f']
try:
if value.lower() in trues:
self.value = True
if value.lower() in falses:
self.value = False
except AttributeError:
if value == 1:
self.value = True
if value == 0:
self.value = False
raise ValueError("must be a boolean value.")
_HASH_REGEX = {
"MD5": ("^[a-fA-F0-9]{32}$", "MD5"),
"MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"),
"RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"),
"SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"),
"SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"),
"SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"),
"SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"),
"SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"),
"SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"),
"SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"),
"SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"),
"SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"),
"SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"),
"WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"),
}
class HashConstant(StringConstant):
def __init__(self, value, type):
key = type.upper().replace('-', '')
if key in _HASH_REGEX:
vocab_key = _HASH_REGEX[key][1]
if not re.match(_HASH_REGEX[key][0], value):
raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key))
self.value = value
class BinaryConstant(Constant):
def __init__(self, value):
try:
base64.b64decode(value)
self.value = value
except (binascii.Error, TypeError):
raise ValueError("must contain a base64 encoded string")
def __str__(self):
return "b'%s'" % self.value
class HexConstant(Constant):
def __init__(self, value):
if not re.match('^([a-fA-F0-9]{2})+$', value):
raise ValueError("must contain an even number of hexadecimal characters")
self.value = value
def __str__(self):
return "h'%s'" % self.value
class ListConstant(Constant):
def __init__(self, values):
self.value = values
def __str__(self):
return "(" + ", ".join([("%s" % x) for x in self.value]) + ")"
def make_constant(value):
if isinstance(value, str):
return StringConstant(value)
elif isinstance(value, int):
return IntegerConstant(value)
elif isinstance(value, float):
return FloatConstant(value)
elif isinstance(value, list):
return ListConstant(value)
elif isinstance(value, bool):
return BooleanConstant(value)
else:
raise ValueError("Unable to create a constant from %s" % value)

View File

@ -1,59 +0,0 @@
class ObjectPathComponent(object):
pass
class BasicObjectPathComponent(ObjectPathComponent):
def __init__(self, property_name, is_key=False):
self.property_name = property_name
# TODO: set is_key to True if this component is a dictionary key
# self.is_key = is_key
def __str__(self):
return self.property_name
class ListObjectPathComponent(ObjectPathComponent):
def __init__(self, property_name, index):
self.property_name = property_name
self.index = index
def __str__(self):
return "%s[%s]" % (self.property_name, self.index)
class ReferenceObjectPathComponent(ObjectPathComponent):
def __init__(self, reference_property_name):
self.property_name = reference_property_name
def __str__(self):
return self.property_name
class ObjectPath(object):
def __init__(self, object_type_name, property_path):
self.object_type_name = object_type_name
self.property_path = [x if isinstance(x, ObjectPathComponent) else ObjectPath.create_ObjectPathComponent(x)
for x in property_path]
def __str__(self):
return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path]))
def merge(self, other):
self.property_path.extend(other.property_path)
return self
@staticmethod
def make_object_path(lhs):
path_as_parts = lhs.split(":")
return ObjectPath(path_as_parts[0], path_as_parts[1].split("."))
@staticmethod
def create_ObjectPathComponent(component_name):
if component_name.endswith("_ref"):
return ReferenceObjectPathComponent(component_name)
elif component_name.find("[") != -1:
parse1 = component_name.split("[")
return ListObjectPathComponent(parse1[0], parse1[1][:-1])
else:
return BasicObjectPathComponent(component_name)

View File

@ -1,223 +0,0 @@
from .constants import Constant, IntegerConstant, ListConstant, make_constant
from .object_path import ObjectPath
class PatternExpression(object):
@staticmethod
def escape_quotes_and_backslashes(s):
return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'")
class ComparisonExpression(PatternExpression):
def __init__(self, operator, lhs, rhs, negated=False):
if operator == "=" and isinstance(rhs, ListConstant):
self.operator = "IN"
else:
self.operator = operator
if isinstance(lhs, ObjectPath):
self.lhs = lhs
else:
self.lhs = ObjectPath.make_object_path(lhs)
if isinstance(rhs, Constant):
self.rhs = rhs
else:
self.rhs = make_constant(rhs)
self.negated = negated
self.root_type = self.lhs.object_type_name
def __str__(self):
# if isinstance(self.rhs, list):
# final_rhs = []
# for r in self.rhs:
# final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'")
# rhs_string = "(" + ", ".join(final_rhs) + ")"
# else:
# rhs_string = self.rhs
if self.negated:
return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs)
else:
return "%s %s %s" % (self.lhs, self.operator, self.rhs)
class EqualityComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated)
class GreaterThanComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated)
class LessThanComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated)
class GreaterThanEqualComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(GreaterThanComparisonExpression, self).__init__(">=", lhs, rhs, negated)
class LessThanEqualComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(LessThanComparisonExpression, self).__init__("<=", lhs, rhs, negated)
class InComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated)
class LikeComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated)
class MatchesComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated)
class IsSubsetComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated)
class IsSupersetComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated)
class BooleanExpression(PatternExpression):
def __init__(self, operator, operands):
self.operator = operator
self.operands = []
for arg in operands:
if not hasattr(self, "root_type"):
self.root_type = arg.root_type
elif self.root_type and (self.root_type != arg.root_type) and operator == "AND":
raise ValueError("This expression cannot have a mixed root type")
elif self.root_type and (self.root_type != arg.root_type):
self.root_type = None
self.operands.append(arg)
def __str__(self):
sub_exprs = []
for o in self.operands:
sub_exprs.append("%s" % o)
return (" " + self.operator + " ").join(sub_exprs)
class AndBooleanExpression(BooleanExpression):
def __init__(self, operands):
super(AndBooleanExpression, self).__init__("AND", operands)
class OrBooleanExpression(BooleanExpression):
def __init__(self, operands):
super(OrBooleanExpression, self).__init__("OR", operands)
class ObservableExpression(PatternExpression):
def __init__(self, operand):
self.operand = operand
def __str__(self):
return "[%s]" % self.operand
class CompoundObservableExpression(PatternExpression):
def __init__(self, operator, operands):
self.operator = operator
self.operands = operands
def __str__(self):
sub_exprs = []
for o in self.operands:
sub_exprs.append("%s" % o)
return (" " + self.operator + " ").join(sub_exprs)
class AndObservableExpression(CompoundObservableExpression):
def __init__(self, operands):
super(AndObservableExpression, self).__init__("AND", operands)
class OrObservableExpression(CompoundObservableExpression):
def __init__(self, operands):
super(OrObservableExpression, self).__init__("OR", operands)
class FollowedByObservableExpression(CompoundObservableExpression):
def __init__(self, operands):
super(FollowedByObservableExpression, self).__init__("FOLLOWEDBY", operands)
class ParentheticalExpression(PatternExpression):
def __init__(self, exp):
self.expression = exp
if hasattr(exp, "root_type"):
self.root_type = exp.root_type
def __str__(self):
return "(%s)" % self.expression
class ExpressionQualifier(PatternExpression):
pass
class RepeatQualifier(ExpressionQualifier):
def __init__(self, times_to_repeat):
if isinstance(times_to_repeat, IntegerConstant):
self.times_to_repeat = times_to_repeat
elif isinstance(times_to_repeat, int):
self.times_to_repeat = IntegerConstant(times_to_repeat)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat)
def __str__(self):
return "REPEATS %s TIMES" % self.times_to_repeat
class WithinQualifier(ExpressionQualifier):
def __init__(self, number_of_seconds):
if isinstance(number_of_seconds, IntegerConstant):
self.number_of_seconds = number_of_seconds
elif isinstance(number_of_seconds, int):
self.number_of_seconds = IntegerConstant(number_of_seconds)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds)
def __str__(self):
return "WITHIN %s SECONDS" % self.number_of_seconds
class StartStopQualifier(ExpressionQualifier):
def __init__(self, start_time, stop_time):
if isinstance(start_time, IntegerConstant):
self.start_time = start_time
elif isinstance(start_time, int):
self.start_time = IntegerConstant(start_time)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time)
if isinstance(stop_time, IntegerConstant):
self.stop_time = stop_time
elif isinstance(stop_time, int):
self.stop_time = IntegerConstant(stop_time)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time)
def __str__(self):
return "START %s STOP %s" % (self.start_time, self.stop_time)
class QualifiedObservationExpression(PatternExpression):
def __init__(self, observation_expression, qualifier):
self.observation_expression = observation_expression
self.qualifier = qualifier
def __str__(self):
return "%s %s" % (self.observation_expression, self.qualifier)

419
stix2/patterns.py Normal file
View File

@ -0,0 +1,419 @@
import base64
import binascii
import re
def escape_quotes_and_backslashes(s):
return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'")
class _Constant(object):
pass
class StringConstant(_Constant):
def __init__(self, value):
self.value = value
def __str__(self):
return "'%s'" % escape_quotes_and_backslashes(self.value)
class IntegerConstant(_Constant):
def __init__(self, value):
try:
self.value = int(value)
except Exception:
raise ValueError("must be an integer.")
def __str__(self):
return "%s" % self.value
class FloatConstant(_Constant):
def __init__(self, value):
try:
self.value = float(value)
except Exception:
raise ValueError("must be an float.")
def __str__(self):
return "%s" % self.value
class BooleanConstant(_Constant):
def __init__(self, value):
if isinstance(value, bool):
self.value = value
trues = ['true', 't']
falses = ['false', 'f']
try:
if value.lower() in trues:
self.value = True
if value.lower() in falses:
self.value = False
except AttributeError:
if value == 1:
self.value = True
if value == 0:
self.value = False
raise ValueError("must be a boolean value.")
def __str__(self):
return "%s" % self.value
_HASH_REGEX = {
"MD5": ("^[a-fA-F0-9]{32}$", "MD5"),
"MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"),
"RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"),
"SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"),
"SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"),
"SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"),
"SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"),
"SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"),
"SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"),
"SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"),
"SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"),
"SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"),
"SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"),
"WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"),
}
class HashConstant(StringConstant):
def __init__(self, value, type):
key = type.upper().replace('-', '')
if key in _HASH_REGEX:
vocab_key = _HASH_REGEX[key][1]
if not re.match(_HASH_REGEX[key][0], value):
raise ValueError("'%s' is not a valid %s hash" % (value, vocab_key))
self.value = value
class BinaryConstant(_Constant):
def __init__(self, value):
try:
base64.b64decode(value)
self.value = value
except (binascii.Error, TypeError):
raise ValueError("must contain a base64 encoded string")
def __str__(self):
return "b'%s'" % self.value
class HexConstant(_Constant):
def __init__(self, value):
if not re.match('^([a-fA-F0-9]{2})+$', value):
raise ValueError("must contain an even number of hexadecimal characters")
self.value = value
def __str__(self):
return "h'%s'" % self.value
class ListConstant(_Constant):
def __init__(self, values):
self.value = values
def __str__(self):
return "(" + ", ".join([("%s" % x) for x in self.value]) + ")"
def make_constant(value):
if isinstance(value, str):
return StringConstant(value)
elif isinstance(value, int):
return IntegerConstant(value)
elif isinstance(value, float):
return FloatConstant(value)
elif isinstance(value, list):
return ListConstant(value)
elif isinstance(value, bool):
return BooleanConstant(value)
else:
raise ValueError("Unable to create a constant from %s" % value)
class _ObjectPathComponent(object):
@staticmethod
def create_ObjectPathComponent(component_name):
if component_name.endswith("_ref"):
return ReferenceObjectPathComponent(component_name)
elif component_name.find("[") != -1:
parse1 = component_name.split("[")
return ListObjectPathComponent(parse1[0], parse1[1][:-1])
else:
return BasicObjectPathComponent(component_name)
class BasicObjectPathComponent(_ObjectPathComponent):
def __init__(self, property_name, is_key=False):
self.property_name = property_name
# TODO: set is_key to True if this component is a dictionary key
# self.is_key = is_key
def __str__(self):
return self.property_name
class ListObjectPathComponent(_ObjectPathComponent):
def __init__(self, property_name, index):
self.property_name = property_name
self.index = index
def __str__(self):
return "%s[%s]" % (self.property_name, self.index)
class ReferenceObjectPathComponent(_ObjectPathComponent):
def __init__(self, reference_property_name):
self.property_name = reference_property_name
def __str__(self):
return self.property_name
class ObjectPath(object):
def __init__(self, object_type_name, property_path):
self.object_type_name = object_type_name
self.property_path = [x if isinstance(x, _ObjectPathComponent) else
_ObjectPathComponent.create_ObjectPathComponent(x)
for x in property_path]
def __str__(self):
return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path]))
def merge(self, other):
self.property_path.extend(other.property_path)
return self
@staticmethod
def make_object_path(lhs):
path_as_parts = lhs.split(":")
return ObjectPath(path_as_parts[0], path_as_parts[1].split("."))
class _PatternExpression(object):
@staticmethod
def escape_quotes_and_backslashes(s):
return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'")
class _ComparisonExpression(_PatternExpression):
def __init__(self, operator, lhs, rhs, negated=False):
if operator == "=" and isinstance(rhs, ListConstant):
self.operator = "IN"
else:
self.operator = operator
if isinstance(lhs, ObjectPath):
self.lhs = lhs
else:
self.lhs = ObjectPath.make_object_path(lhs)
if isinstance(rhs, _Constant):
self.rhs = rhs
else:
self.rhs = make_constant(rhs)
self.negated = negated
self.root_type = self.lhs.object_type_name
def __str__(self):
# if isinstance(self.rhs, list):
# final_rhs = []
# for r in self.rhs:
# final_rhs.append("'" + self.escape_quotes_and_backslashes("%s" % r) + "'")
# rhs_string = "(" + ", ".join(final_rhs) + ")"
# else:
# rhs_string = self.rhs
if self.negated:
return "%s NOT %s %s" % (self.lhs, self.operator, self.rhs)
else:
return "%s %s %s" % (self.lhs, self.operator, self.rhs)
class EqualityComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated)
class GreaterThanComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated)
class LessThanComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated)
class GreaterThanEqualComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(GreaterThanEqualComparisonExpression, self).__init__(">=", lhs, rhs, negated)
class LessThanEqualComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(LessThanEqualComparisonExpression, self).__init__("<=", lhs, rhs, negated)
class InComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated)
class LikeComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated)
class MatchesComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated)
class IsSubsetComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated)
class IsSupersetComparisonExpression(_ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated)
class _BooleanExpression(_PatternExpression):
def __init__(self, operator, operands):
self.operator = operator
self.operands = []
for arg in operands:
if not hasattr(self, "root_type"):
self.root_type = arg.root_type
elif self.root_type and (self.root_type != arg.root_type) and operator == "AND":
raise ValueError("All operands to an 'AND' expression must have the same object type")
elif self.root_type and (self.root_type != arg.root_type):
self.root_type = None
self.operands.append(arg)
def __str__(self):
sub_exprs = []
for o in self.operands:
sub_exprs.append("%s" % o)
return (" " + self.operator + " ").join(sub_exprs)
class AndBooleanExpression(_BooleanExpression):
def __init__(self, operands):
super(AndBooleanExpression, self).__init__("AND", operands)
class OrBooleanExpression(_BooleanExpression):
def __init__(self, operands):
super(OrBooleanExpression, self).__init__("OR", operands)
class ObservationExpression(_PatternExpression):
def __init__(self, operand):
self.operand = operand
def __str__(self):
return "[%s]" % self.operand
class _CompoundObservationExpression(_PatternExpression):
def __init__(self, operator, operands):
self.operator = operator
self.operands = operands
def __str__(self):
sub_exprs = []
for o in self.operands:
sub_exprs.append("%s" % o)
return (" " + self.operator + " ").join(sub_exprs)
class AndObservationExpression(_CompoundObservationExpression):
def __init__(self, operands):
super(AndObservationExpression, self).__init__("AND", operands)
class OrObservationExpression(_CompoundObservationExpression):
def __init__(self, operands):
super(OrObservationExpression, self).__init__("OR", operands)
class FollowedByObservationExpression(_CompoundObservationExpression):
def __init__(self, operands):
super(FollowedByObservationExpression, self).__init__("FOLLOWEDBY", operands)
class ParentheticalExpression(_PatternExpression):
def __init__(self, exp):
self.expression = exp
if hasattr(exp, "root_type"):
self.root_type = exp.root_type
def __str__(self):
return "(%s)" % self.expression
class _ExpressionQualifier(_PatternExpression):
pass
class RepeatQualifier(_ExpressionQualifier):
def __init__(self, times_to_repeat):
if isinstance(times_to_repeat, IntegerConstant):
self.times_to_repeat = times_to_repeat
elif isinstance(times_to_repeat, int):
self.times_to_repeat = IntegerConstant(times_to_repeat)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat)
def __str__(self):
return "REPEATS %s TIMES" % self.times_to_repeat
class WithinQualifier(_ExpressionQualifier):
def __init__(self, number_of_seconds):
if isinstance(number_of_seconds, IntegerConstant):
self.number_of_seconds = number_of_seconds
elif isinstance(number_of_seconds, int):
self.number_of_seconds = IntegerConstant(number_of_seconds)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds)
def __str__(self):
return "WITHIN %s SECONDS" % self.number_of_seconds
class StartStopQualifier(_ExpressionQualifier):
def __init__(self, start_time, stop_time):
if isinstance(start_time, IntegerConstant):
self.start_time = start_time
elif isinstance(start_time, int):
self.start_time = IntegerConstant(start_time)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time)
if isinstance(stop_time, IntegerConstant):
self.stop_time = stop_time
elif isinstance(stop_time, int):
self.stop_time = IntegerConstant(stop_time)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time)
def __str__(self):
return "START %s STOP %s" % (self.start_time, self.stop_time)
class QualifiedObservationExpression(_PatternExpression):
def __init__(self, observation_expression, qualifier):
self.observation_expression = observation_expression
self.qualifier = qualifier
def __str__(self):
return "%s %s" % (self.observation_expression, self.qualifier)

View File

@ -18,7 +18,9 @@ def test_boolean_expression():
def test_boolean_expression_with_parentheses(): def test_boolean_expression_with_parentheses():
exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", exp1 = stix2.MatchesComparisonExpression(stix2.ObjectPath("email-message",
[stix2.ReferenceObjectPathComponent("from_ref"),
stix2.BasicObjectPathComponent("value")]),
stix2.StringConstant(".+\\@example\\.com$")) stix2.StringConstant(".+\\@example\\.com$"))
exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name",
stix2.StringConstant("^Final Report.+\\.exe$")) stix2.StringConstant("^Final Report.+\\.exe$"))
@ -29,11 +31,11 @@ def test_boolean_expression_with_parentheses():
def test_hash_followed_by_registryKey_expression_python_constant(): def test_hash_followed_by_registryKey_expression_python_constant():
hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5",
stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5"))
o_exp1 = stix2.ObservableExpression(hash_exp) o_exp1 = stix2.ObservationExpression(hash_exp)
reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]),
stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar"))
o_exp2 = stix2.ObservableExpression(reg_exp) o_exp2 = stix2.ObservationExpression(reg_exp)
fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2])
para_exp = stix2.ParentheticalExpression(fb_exp) para_exp = stix2.ParentheticalExpression(fb_exp)
qual_exp = stix2.WithinQualifier(300) qual_exp = stix2.WithinQualifier(300)
exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) exp = stix2.QualifiedObservationExpression(para_exp, qual_exp)
@ -43,11 +45,11 @@ def test_hash_followed_by_registryKey_expression_python_constant():
def test_hash_followed_by_registryKey_expression(): def test_hash_followed_by_registryKey_expression():
hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5", hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5",
stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5")) stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5"))
o_exp1 = stix2.ObservableExpression(hash_exp) o_exp1 = stix2.ObservationExpression(hash_exp)
reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]), reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]),
stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar")) stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar"))
o_exp2 = stix2.ObservableExpression(reg_exp) o_exp2 = stix2.ObservationExpression(reg_exp)
fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2]) fb_exp = stix2.FollowedByObservationExpression([o_exp1, o_exp2])
para_exp = stix2.ParentheticalExpression(fb_exp) para_exp = stix2.ParentheticalExpression(fb_exp)
qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300)) qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300))
exp = stix2.QualifiedObservationExpression(para_exp, qual_exp) exp = stix2.QualifiedObservationExpression(para_exp, qual_exp)
@ -61,7 +63,7 @@ def test_file_observable_expression():
'SHA-256')) 'SHA-256'))
exp2 = stix2.EqualityComparisonExpression("file:mime_type", stix2.StringConstant("application/x-pdf")) exp2 = stix2.EqualityComparisonExpression("file:mime_type", stix2.StringConstant("application/x-pdf"))
bool_exp = stix2.AndBooleanExpression([exp1, exp2]) bool_exp = stix2.AndBooleanExpression([exp1, exp2])
exp = stix2.ObservableExpression(bool_exp) exp = stix2.ObservationExpression(bool_exp)
assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa
@ -77,14 +79,14 @@ def test_multiple_file_observable_expression():
stix2.HashConstant( stix2.HashConstant(
"aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f", "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f",
'SHA-256')) 'SHA-256'))
op1_exp = stix2.ObservableExpression(bool1_exp) op1_exp = stix2.ObservationExpression(bool1_exp)
op2_exp = stix2.ObservableExpression(exp3) op2_exp = stix2.ObservationExpression(exp3)
exp = stix2.AndObservableExpression([op1_exp, op2_exp]) exp = stix2.AndObservationExpression([op1_exp, op2_exp])
assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa
def test_root_types(): def test_root_types():
ast = stix2.ObservableExpression( ast = stix2.ObservationExpression(
stix2.AndBooleanExpression( stix2.AndBooleanExpression(
[stix2.ParentheticalExpression( [stix2.ParentheticalExpression(
stix2.OrBooleanExpression([ stix2.OrBooleanExpression([
@ -100,21 +102,21 @@ def test_artifact_payload():
exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin", exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin",
stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00")) stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00"))
and_exp = stix2.AndBooleanExpression([exp1, exp2]) and_exp = stix2.AndBooleanExpression([exp1, exp2])
exp = stix2.ObservableExpression(and_exp) exp = stix2.ObservationExpression(and_exp)
assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa
def test_greater_than_python_constant(): def test_greater_than_python_constant():
exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy",
7.0) 7.0)
exp = stix2.ObservableExpression(exp1) exp = stix2.ObservationExpression(exp1)
assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]"
def test_greater_than(): def test_greater_than():
exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy", exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy",
stix2.FloatConstant(7.0)) stix2.FloatConstant(7.0))
exp = stix2.ObservableExpression(exp1) exp = stix2.ObservationExpression(exp1)
assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]" assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]"
@ -137,9 +139,9 @@ def test_and_observable_expression():
stix2.StringConstant("1009")), stix2.StringConstant("1009")),
stix2.EqualityComparisonExpression("user-account:account_login", stix2.EqualityComparisonExpression("user-account:account_login",
"Mary")]) "Mary")])
exp = stix2.AndObservableExpression([stix2.ObservableExpression(exp1), exp = stix2.AndObservationExpression([stix2.ObservationExpression(exp1),
stix2.ObservableExpression(exp2), stix2.ObservationExpression(exp2),
stix2.ObservableExpression(exp3)]) stix2.ObservationExpression(exp3)])
assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']" # noqa assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']" # noqa
@ -148,7 +150,7 @@ def test_hex():
"image/bmp"), "image/bmp"),
stix2.EqualityComparisonExpression("file:magic_number_hex", stix2.EqualityComparisonExpression("file:magic_number_hex",
stix2.HexConstant("ffd8"))]) stix2.HexConstant("ffd8"))])
exp = stix2.ObservableExpression(exp_and) exp = stix2.ObservationExpression(exp_and)
assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']" assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']"
@ -157,7 +159,7 @@ def test_multiple_qualifiers():
"domain-name"), "domain-name"),
stix2.EqualityComparisonExpression("network-traffic:dst_ref.value", stix2.EqualityComparisonExpression("network-traffic:dst_ref.value",
"example.com")]) "example.com")])
exp_ob = stix2.ObservableExpression(exp_and) exp_ob = stix2.ObservationExpression(exp_and)
qual_rep = stix2.RepeatQualifier(5) qual_rep = stix2.RepeatQualifier(5)
qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800)) qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800))
exp = stix2.QualifiedObservationExpression(stix2.QualifiedObservationExpression(exp_ob, qual_rep), qual_within) exp = stix2.QualifiedObservationExpression(stix2.QualifiedObservationExpression(exp_ob, qual_rep), qual_within)
@ -165,6 +167,6 @@ def test_multiple_qualifiers():
def test_set_op(): def test_set_op():
exp = stix2.ObservableExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value", exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value",
"2001:0db8:dead:beef:0000:0000:0000:0000/64")) "2001:0db8:dead:beef:0000:0000:0000:0000/64"))
assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']" assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']"