added classes for Pattern Expressions
parent
9ea65933c6
commit
c0467da5f9
|
@ -20,6 +20,16 @@ from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
|
|||
from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE,
|
||||
ExternalReference, GranularMarking, KillChainPhase,
|
||||
MarkingDefinition, StatementMarking, TLPMarking)
|
||||
from .pattern_expressions import (AndBooleanExpression,
|
||||
AndObservableExpression,
|
||||
EqualityComparisonExpression,
|
||||
FollowedByObservableExpression,
|
||||
MatchesComparisonExpression,
|
||||
ObservableExpression, OrBooleanExpression,
|
||||
OrObservableExpression,
|
||||
ParentheticalExpression,
|
||||
QualifiedObservationExpression,
|
||||
WithinQualifier)
|
||||
from .sdo import (AttackPattern, Campaign, CourseOfAction, CustomObject,
|
||||
Identity, Indicator, IntrusionSet, Malware, ObservedData,
|
||||
Report, ThreatActor, Tool, Vulnerability)
|
||||
|
|
|
@ -0,0 +1,187 @@
|
|||
from six import text_type
|
||||
|
||||
|
||||
class PatternExpression(object):
|
||||
|
||||
@staticmethod
|
||||
def get_root_from_object_path(lhs):
|
||||
path_as_parts = lhs.split(":")
|
||||
return path_as_parts[0]
|
||||
|
||||
@staticmethod
|
||||
def escape_quotes_and_backslashes(s):
|
||||
return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'")
|
||||
|
||||
|
||||
class ComparisonExpression(PatternExpression):
|
||||
def __init__(self, operator, lhs, rhs, negated=False):
|
||||
if operator == "=" and isinstance(rhs, list):
|
||||
self.operator = "IN"
|
||||
else:
|
||||
self.operator = operator
|
||||
self.lhs = lhs
|
||||
self.rhs = rhs
|
||||
self.negated = negated
|
||||
self.root_type = self.get_root_from_object_path(lhs)
|
||||
|
||||
def __str__(self):
|
||||
if isinstance(self.rhs, list):
|
||||
final_rhs = []
|
||||
for r in self.rhs:
|
||||
final_rhs.append("'" + self.escape_quotes_and_backslashes(text_type(r)) + "'")
|
||||
rhs_string = "(" + ", ".join(final_rhs) + ")"
|
||||
else:
|
||||
rhs_string = "'" + self.escape_quotes_and_backslashes(text_type(self.rhs)) + "'"
|
||||
return self.lhs + (" NOT" if self.negated else "") + " " + self.operator + " " + rhs_string
|
||||
|
||||
|
||||
class EqualityComparisonExpression(ComparisonExpression):
|
||||
def __init__(self, lhs, rhs, negated=False):
|
||||
super(EqualityComparisonExpression, self).__init__("=", lhs, rhs, negated)
|
||||
|
||||
|
||||
class GreaterThanComparisonExpression(ComparisonExpression):
|
||||
def __init__(self, lhs, rhs, negated=False):
|
||||
super(GreaterThanComparisonExpression, self).__init__(">", lhs, rhs, negated)
|
||||
|
||||
|
||||
class LessThanComparisonExpression(ComparisonExpression):
|
||||
def __init__(self, lhs, rhs, negated=False):
|
||||
super(LessThanComparisonExpression, self).__init__("<", lhs, rhs, negated)
|
||||
|
||||
|
||||
class GreaterThanEqualComparisonExpression(ComparisonExpression):
|
||||
def __init__(self, lhs, rhs, negated=False):
|
||||
super(GreaterThanComparisonExpression, self).__init__(">=", lhs, rhs, negated)
|
||||
|
||||
|
||||
class LessThanEqualComparisonExpression(ComparisonExpression):
|
||||
def __init__(self, lhs, rhs, negated=False):
|
||||
super(LessThanComparisonExpression, self).__init__("<=", lhs, rhs, negated)
|
||||
|
||||
|
||||
class InComparisonExpression(ComparisonExpression):
|
||||
def __init__(self, lhs, rhs, negated=False):
|
||||
super(InComparisonExpression, self).__init__("IN", lhs, rhs, negated)
|
||||
|
||||
|
||||
class LikeComparisonExpression(ComparisonExpression):
|
||||
def __init__(self, lhs, rhs, negated=False):
|
||||
super(LikeComparisonExpression, self).__init__("LIKE", lhs, rhs, negated)
|
||||
|
||||
|
||||
class MatchesComparisonExpression(ComparisonExpression):
|
||||
def __init__(self, lhs, rhs, negated=False):
|
||||
super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated)
|
||||
|
||||
|
||||
# TODO: ISASUBSET, ISSUPERSET
|
||||
|
||||
|
||||
class BooleanExpression(PatternExpression):
|
||||
def __init__(self, operator, operands):
|
||||
self.operator = operator
|
||||
self.operands = []
|
||||
for arg in operands:
|
||||
if not hasattr(self, "root_type"):
|
||||
self.root_type = arg.root_type
|
||||
elif self.root_type and (self.root_type != arg.root_type) and operator == "AND":
|
||||
raise ValueError("This expression cannot have a mixed root type")
|
||||
self.operands.append(arg)
|
||||
|
||||
def __str__(self):
|
||||
sub_exprs = []
|
||||
for o in self.operands:
|
||||
sub_exprs.append(str(o))
|
||||
return (" " + self.operator + " ").join(sub_exprs)
|
||||
|
||||
|
||||
class AndBooleanExpression(BooleanExpression):
|
||||
def __init__(self, operands):
|
||||
super(AndBooleanExpression, self).__init__("AND", operands)
|
||||
|
||||
|
||||
class OrBooleanExpression(BooleanExpression):
|
||||
def __init__(self, operands):
|
||||
super(OrBooleanExpression, self).__init__("OR", operands)
|
||||
|
||||
|
||||
class ObservableExpression(PatternExpression):
|
||||
def __init__(self, operand):
|
||||
self.operand = operand
|
||||
|
||||
def __str__(self):
|
||||
return "[" + str(self.operand) + "]"
|
||||
|
||||
|
||||
class CompoundObservableExpression(PatternExpression):
|
||||
def __init__(self, operator, operands):
|
||||
self.operator = operator
|
||||
self.operands = operands
|
||||
|
||||
def __str__(self):
|
||||
sub_exprs = []
|
||||
for o in self.operands:
|
||||
sub_exprs.append(str(o))
|
||||
return (" " + self.operator + " ").join(sub_exprs)
|
||||
|
||||
|
||||
class AndObservableExpression(CompoundObservableExpression):
|
||||
def __init__(self, operands):
|
||||
super(AndObservableExpression, self).__init__("AND", operands)
|
||||
|
||||
|
||||
class OrObservableExpression(CompoundObservableExpression):
|
||||
def __init__(self, operands):
|
||||
super(OrObservableExpression, self).__init__("OR", operands)
|
||||
|
||||
|
||||
class FollowedByObservableExpression(CompoundObservableExpression):
|
||||
def __init__(self, operands):
|
||||
super(FollowedByObservableExpression, self).__init__("FOLLOWEDBY", operands)
|
||||
|
||||
|
||||
class ParentheticalExpression(PatternExpression):
|
||||
def __init__(self, exp):
|
||||
self.expression = exp
|
||||
|
||||
def __str__(self):
|
||||
return "(" + str(self.expression) + ")"
|
||||
|
||||
|
||||
class ExpressionQualifier(PatternExpression):
|
||||
pass
|
||||
|
||||
|
||||
class RepeatQualifier(ExpressionQualifier):
|
||||
def __init__(self, times_to_repeat):
|
||||
self.times_to_repeat = times_to_repeat
|
||||
|
||||
def __str__(self):
|
||||
return "REPEATS %s TIMES" % str(self.times_to_repeat)
|
||||
|
||||
|
||||
class WithinQualifier(ExpressionQualifier):
|
||||
def __init__(self, number_of_seconds):
|
||||
self.number_of_seconds = number_of_seconds
|
||||
|
||||
def __str__(self):
|
||||
return "WITHIN %s SECONDS" % (str(self.number_of_seconds))
|
||||
|
||||
|
||||
class StartStopQualifier(ExpressionQualifier):
|
||||
def __init__(self, start_time, stop_time):
|
||||
self.start_time = start_time
|
||||
self.stop_time = stop_time
|
||||
|
||||
def __str__(self):
|
||||
return "START %s STOP %s" % (str(self.start_time), str(self.stop_time))
|
||||
|
||||
|
||||
class QualifiedObservationExpression(PatternExpression):
|
||||
def __init__(self, observation_expression, qualifier):
|
||||
self.observation_expression = observation_expression
|
||||
self.qualifier = qualifier
|
||||
|
||||
def __str__(self):
|
||||
return str(self.observation_expression) + " " + str(self.qualifier)
|
|
@ -0,0 +1,58 @@
|
|||
import stix2
|
||||
|
||||
|
||||
def test_create_comparison_expression():
|
||||
|
||||
exp = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'", "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f")
|
||||
assert str(exp) == "file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f'"
|
||||
|
||||
|
||||
def test_boolean_expression():
|
||||
exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", ".+\\@example\\.com$")
|
||||
exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", "^Final Report.+\\.exe$")
|
||||
exp = stix2.AndBooleanExpression([exp1, exp2])
|
||||
assert str(exp) == "email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$'" # noqa
|
||||
|
||||
|
||||
def test_boolean_expression_with_parentheses():
|
||||
exp1 = stix2.MatchesComparisonExpression("email-message:from_ref.value", ".+\\@example\\.com$")
|
||||
exp2 = stix2.MatchesComparisonExpression("email-message:body_multipart[*].body_raw_ref.name", "^Final Report.+\\.exe$")
|
||||
exp = stix2.ParentheticalExpression(stix2.AndBooleanExpression([exp1, exp2]))
|
||||
assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')" # noqa
|
||||
|
||||
|
||||
def test_hash_followed_by_registryKey_expression():
|
||||
hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5",
|
||||
"79054025255fb1a26e4bc422aef54eb4")
|
||||
o_exp1 = stix2.ObservableExpression(hash_exp)
|
||||
reg_exp = stix2.EqualityComparisonExpression("win-registry-key:key",
|
||||
"HKEY_LOCAL_MACHINE\\foo\\bar")
|
||||
o_exp2 = stix2.ObservableExpression(reg_exp)
|
||||
fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2])
|
||||
para_exp = stix2.ParentheticalExpression(fb_exp)
|
||||
qual_exp = stix2.WithinQualifier(300)
|
||||
exp = stix2.QualifiedObservationExpression(para_exp, qual_exp)
|
||||
assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [win-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa
|
||||
|
||||
|
||||
def test_file_observable_expression():
|
||||
exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
|
||||
"aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f")
|
||||
exp2 = stix2.EqualityComparisonExpression("file:mime_type", "application/x-pdf")
|
||||
bool_exp = stix2.AndBooleanExpression([exp1, exp2])
|
||||
exp = stix2.ObservableExpression(bool_exp)
|
||||
assert str(exp) == "[file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f' AND file:mime_type = 'application/x-pdf']" # noqa
|
||||
|
||||
|
||||
def test_multiple_file_observable_expression():
|
||||
exp1 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
|
||||
"bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c")
|
||||
exp2 = stix2.EqualityComparisonExpression("file:hashes.MD5",
|
||||
"cead3f77f6cda6ec00f57d76c9a6879f")
|
||||
bool1_exp = stix2.OrBooleanExpression([exp1, exp2])
|
||||
exp3 = stix2.EqualityComparisonExpression("file:hashes.'SHA-256'",
|
||||
"aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f")
|
||||
op1_exp = stix2.ObservableExpression(bool1_exp)
|
||||
op2_exp = stix2.ObservableExpression(exp3)
|
||||
exp = stix2.AndObservableExpression([op1_exp, op2_exp])
|
||||
assert str(exp) == "[file:hashes.'SHA-256' = 'bf07a7fbb825fc0aae7bf4a1177b2b31fcf8a3feeaf7092761e18c859ee52a9c' OR file:hashes.MD5 = 'cead3f77f6cda6ec00f57d76c9a6879f'] AND [file:hashes.'SHA-256' = 'aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f']" # noqa
|
Loading…
Reference in New Issue