added object_paths

added more tests for pattern expressions
added "set" comparison expressions
implemented make_constant
fixed type name for EmailAddress
stix2.1
Richard Piazza 2017-07-12 17:02:51 -04:00 committed by Greg Back
parent c1b07ef505
commit 6fa009e509
7 changed files with 197 additions and 26 deletions

View File

@ -4,8 +4,9 @@
from . import exceptions
from .bundle import Bundle
from .constants import (FloatConstant, HashConstant, IntegerConstant,
StringConstant)
from .constants import (FloatConstant, HashConstant, HexConstant,
IntegerConstant, StringConstant)
from .object_path import ObjectPath, ObjectPathComponent
from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
AutonomousSystem, CustomObservable, Directory,
DomainName, EmailAddress, EmailMessage,
@ -29,6 +30,8 @@ from .pattern_expressions import (AndBooleanExpression,
FollowedByObservableExpression,
GreaterThanComparisonExpression,
GreaterThanEqualComparisonExpression,
IsSubsetComparisonExpression,
IsSupersetComparisonExpression,
LessThanComparisonExpression,
LessThanEqualComparisonExpression,
LikeComparisonExpression,

View File

@ -121,5 +121,15 @@ class ListConstant(Constant):
def make_constant(value):
# TODO: Stub
pass
if isinstance(value, str):
return StringConstant(value)
elif isinstance(value, int):
return IntegerConstant(value)
elif isinstance(value, float):
return FloatConstant(value)
elif isinstance(value, list):
return ListConstant(value)
elif isinstance(value, bool):
return BooleanConstant(value)
else:
raise ValueError("Unable to create a constant from %s" % value)

59
stix2/object_path.py Normal file
View File

@ -0,0 +1,59 @@
class ObjectPathComponent(object):
pass
class BasicObjectPathComponent(ObjectPathComponent):
def __init__(self, property_name, is_key=False):
self.property_name = property_name
# TODO: set is_key to True if this component is a dictionary key
# self.is_key = is_key
def __str__(self):
return self.property_name
class ListObjectPathComponent(ObjectPathComponent):
def __init__(self, property_name, index):
self.property_name = property_name
self.index = index
def __str__(self):
return "%s[%s]" % (self.property_name, self.index)
class ReferenceObjectPathComponent(ObjectPathComponent):
def __init__(self, reference_property_name):
self.property_name = reference_property_name
def __str__(self):
return self.property_name
class ObjectPath(object):
def __init__(self, object_type_name, property_path):
self.object_type_name = object_type_name
self.property_path = [x if isinstance(x, ObjectPathComponent) else ObjectPath.create_ObjectPathComponent(x)
for x in property_path]
def __str__(self):
return "%s:%s" % (self.object_type_name, ".".join(["%s" % x for x in self.property_path]))
def merge(self, other):
self.property_path.extend(other.property_path)
return self
@staticmethod
def make_object_path(lhs):
path_as_parts = lhs.split(":")
return ObjectPath(path_as_parts[0], path_as_parts[1].split("."))
@staticmethod
def create_ObjectPathComponent(component_name):
if component_name.endswith("_ref"):
return ReferenceObjectPathComponent(component_name)
elif component_name.find("[") != -1:
parse1 = component_name.split("[")
return ListObjectPathComponent(parse1[0], parse1[1][:-1])
else:
return BasicObjectPathComponent(component_name)

View File

@ -123,7 +123,7 @@ class DomainName(_Observable):
class EmailAddress(_Observable):
_type = 'email-address'
_type = 'email-addr'
_properties = {
'type': TypeProperty(_type),
'value': StringProperty(required=True),
@ -651,7 +651,7 @@ OBJ_MAP_OBSERVABLE = {
'autonomous-system': AutonomousSystem,
'directory': Directory,
'domain-name': DomainName,
'email-address': EmailAddress,
'email-addr': EmailAddress,
'email-message': EmailMessage,
'file': File,
'ipv4-addr': IPv4Address,

View File

@ -1,13 +1,9 @@
from .constants import ListConstant, make_constant
from .constants import Constant, IntegerConstant, ListConstant, make_constant
from .object_path import ObjectPath
class PatternExpression(object):
@staticmethod
def get_root_from_object_path(lhs):
path_as_parts = lhs.split(":")
return path_as_parts[0]
@staticmethod
def escape_quotes_and_backslashes(s):
return s.replace(u'\\', u'\\\\').replace(u"'", u"\\'")
@ -19,13 +15,16 @@ class ComparisonExpression(PatternExpression):
self.operator = "IN"
else:
self.operator = operator
self.lhs = lhs
if isinstance(rhs, str):
self.rhs = make_constant(rhs)
if isinstance(lhs, ObjectPath):
self.lhs = lhs
else:
self.lhs = ObjectPath.make_object_path(lhs)
if isinstance(rhs, Constant):
self.rhs = rhs
else:
self.rhs = make_constant(rhs)
self.negated = negated
self.root_type = self.get_root_from_object_path(lhs)
self.root_type = self.lhs.object_type_name
def __str__(self):
# if isinstance(self.rhs, list):
@ -81,7 +80,14 @@ class MatchesComparisonExpression(ComparisonExpression):
super(MatchesComparisonExpression, self).__init__("MATCHES", lhs, rhs, negated)
# TODO: ISASUBSET, ISSUPERSET
class IsSubsetComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(IsSubsetComparisonExpression, self).__init__("ISSUBSET", lhs, rhs, negated)
class IsSupersetComparisonExpression(ComparisonExpression):
def __init__(self, lhs, rhs, negated=False):
super(IsSupersetComparisonExpression, self).__init__("ISSUPERSET", lhs, rhs, negated)
class BooleanExpression(PatternExpression):
@ -165,7 +171,12 @@ class ExpressionQualifier(PatternExpression):
class RepeatQualifier(ExpressionQualifier):
def __init__(self, times_to_repeat):
self.times_to_repeat = times_to_repeat
if isinstance(times_to_repeat, IntegerConstant):
self.times_to_repeat = times_to_repeat
elif isinstance(times_to_repeat, int):
self.times_to_repeat = IntegerConstant(times_to_repeat)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % times_to_repeat)
def __str__(self):
return "REPEATS %s TIMES" % self.times_to_repeat
@ -173,7 +184,12 @@ class RepeatQualifier(ExpressionQualifier):
class WithinQualifier(ExpressionQualifier):
def __init__(self, number_of_seconds):
self.number_of_seconds = number_of_seconds
if isinstance(number_of_seconds, IntegerConstant):
self.number_of_seconds = number_of_seconds
elif isinstance(number_of_seconds, int):
self.number_of_seconds = IntegerConstant(number_of_seconds)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % number_of_seconds)
def __str__(self):
return "WITHIN %s SECONDS" % self.number_of_seconds
@ -181,8 +197,18 @@ class WithinQualifier(ExpressionQualifier):
class StartStopQualifier(ExpressionQualifier):
def __init__(self, start_time, stop_time):
self.start_time = start_time
self.stop_time = stop_time
if isinstance(start_time, IntegerConstant):
self.start_time = start_time
elif isinstance(start_time, int):
self.start_time = IntegerConstant(start_time)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % start_time)
if isinstance(stop_time, IntegerConstant):
self.stop_time = stop_time
elif isinstance(stop_time, int):
self.stop_time = IntegerConstant(stop_time)
else:
raise ValueError("%s is not a valid argument for a Within Qualifier" % stop_time)
def __str__(self):
return "START %s STOP %s" % (self.start_time, self.stop_time)

View File

@ -225,7 +225,7 @@ def test_parse_autonomous_system_valid(data):
@pytest.mark.parametrize("data", [
"""{
"type": "email-address",
"type": "email-addr",
"value": "john@example.com",
"display_name": "John Doe",
"belongs_to_ref": "0"
@ -233,7 +233,7 @@ def test_parse_autonomous_system_valid(data):
])
def test_parse_email_address(data):
odata = stix2.parse_observable(data, {"0": "user-account"})
assert odata.type == "email-address"
assert odata.type == "email-addr"
odata_str = re.compile('"belongs_to_ref": "0"', re.DOTALL).sub('"belongs_to_ref": "3"', data)
with pytest.raises(stix2.exceptions.InvalidObjRefError):

View File

@ -26,18 +26,32 @@ def test_boolean_expression_with_parentheses():
assert str(exp) == "(email-message:from_ref.value MATCHES '.+\\\\@example\\\\.com$' AND email-message:body_multipart[*].body_raw_ref.name MATCHES '^Final Report.+\\\\.exe$')" # noqa
def test_hash_followed_by_registryKey_expression_python_constant():
hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5",
stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5"))
o_exp1 = stix2.ObservableExpression(hash_exp)
reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]),
stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar"))
o_exp2 = stix2.ObservableExpression(reg_exp)
fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2])
para_exp = stix2.ParentheticalExpression(fb_exp)
qual_exp = stix2.WithinQualifier(300)
exp = stix2.QualifiedObservationExpression(para_exp, qual_exp)
assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa
def test_hash_followed_by_registryKey_expression():
hash_exp = stix2.EqualityComparisonExpression("file:hashes.MD5",
stix2.HashConstant("79054025255fb1a26e4bc422aef54eb4", "MD5"))
o_exp1 = stix2.ObservableExpression(hash_exp)
reg_exp = stix2.EqualityComparisonExpression("win-registry-key:key",
reg_exp = stix2.EqualityComparisonExpression(stix2.ObjectPath("windows-registry-key", ["key"]),
stix2.StringConstant("HKEY_LOCAL_MACHINE\\foo\\bar"))
o_exp2 = stix2.ObservableExpression(reg_exp)
fb_exp = stix2.FollowedByObservableExpression([o_exp1, o_exp2])
para_exp = stix2.ParentheticalExpression(fb_exp)
qual_exp = stix2.WithinQualifier(stix2.IntegerConstant(300))
exp = stix2.QualifiedObservationExpression(para_exp, qual_exp)
assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [win-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa
assert str(exp) == "([file:hashes.MD5 = '79054025255fb1a26e4bc422aef54eb4'] FOLLOWEDBY [windows-registry-key:key = 'HKEY_LOCAL_MACHINE\\\\foo\\\\bar']) WITHIN 300 SECONDS" # noqa
def test_file_observable_expression():
@ -82,7 +96,7 @@ def test_root_types():
def test_artifact_payload():
exp1 = stix2.EqualityComparisonExpression("artifact:mime_type",
stix2.StringConstant("application/vnd.tcpdump.pcap"))
"application/vnd.tcpdump.pcap")
exp2 = stix2.MatchesComparisonExpression("artifact:payload_bin",
stix2.StringConstant("\\xd4\\xc3\\xb2\\xa1\\x02\\x00\\x04\\x00"))
and_exp = stix2.AndBooleanExpression([exp1, exp2])
@ -90,8 +104,67 @@ def test_artifact_payload():
assert str(exp) == "[artifact:mime_type = 'application/vnd.tcpdump.pcap' AND artifact:payload_bin MATCHES '\\\\xd4\\\\xc3\\\\xb2\\\\xa1\\\\x02\\\\x00\\\\x04\\\\x00']" # noqa
def test_greater_than_python_constant():
exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy",
7.0)
exp = stix2.ObservableExpression(exp1)
assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]"
def test_greater_than():
exp1 = stix2.GreaterThanComparisonExpression("file:extensions.windows-pebinary-ext.sections[*].entropy",
stix2.FloatConstant(7.0))
exp = stix2.ObservableExpression(exp1)
assert str(exp) == "[file:extensions.windows-pebinary-ext.sections[*].entropy > 7.0]"
def test_and_observable_expression():
exp1 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type",
"unix"),
stix2.EqualityComparisonExpression("user-account:user_id",
stix2.StringConstant("1007")),
stix2.EqualityComparisonExpression("user-account:account_login",
"Peter")])
exp2 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type",
"unix"),
stix2.EqualityComparisonExpression("user-account:user_id",
stix2.StringConstant("1008")),
stix2.EqualityComparisonExpression("user-account:account_login",
"Paul")])
exp3 = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("user-account:account_type",
"unix"),
stix2.EqualityComparisonExpression("user-account:user_id",
stix2.StringConstant("1009")),
stix2.EqualityComparisonExpression("user-account:account_login",
"Mary")])
exp = stix2.AndObservableExpression([stix2.ObservableExpression(exp1),
stix2.ObservableExpression(exp2),
stix2.ObservableExpression(exp3)])
assert str(exp) == "[user-account:account_type = 'unix' AND user-account:user_id = '1007' AND user-account:account_login = 'Peter'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1008' AND user-account:account_login = 'Paul'] AND [user-account:account_type = 'unix' AND user-account:user_id = '1009' AND user-account:account_login = 'Mary']" # noqa
def test_hex():
exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("file:mime_type",
"image/bmp"),
stix2.EqualityComparisonExpression("file:magic_number_hex",
stix2.HexConstant("ffd8"))])
exp = stix2.ObservableExpression(exp_and)
assert str(exp) == "[file:mime_type = 'image/bmp' AND file:magic_number_hex = h'ffd8']"
def test_multiple_qualifiers():
exp_and = stix2.AndBooleanExpression([stix2.EqualityComparisonExpression("network-traffic:dst_ref.type",
"domain-name"),
stix2.EqualityComparisonExpression("network-traffic:dst_ref.value",
"example.com")])
exp_ob = stix2.ObservableExpression(exp_and)
qual_rep = stix2.RepeatQualifier(5)
qual_within = stix2.WithinQualifier(stix2.IntegerConstant(1800))
exp = stix2.QualifiedObservationExpression(stix2.QualifiedObservationExpression(exp_ob, qual_rep), qual_within)
assert str(exp) == "[network-traffic:dst_ref.type = 'domain-name' AND network-traffic:dst_ref.value = 'example.com'] REPEATS 5 TIMES WITHIN 1800 SECONDS" # noqa
def test_set_op():
exp = stix2.ObservableExpression(stix2.IsSubsetComparisonExpression("network-traffic:dst_ref.value",
"2001:0db8:dead:beef:0000:0000:0000:0000/64"))
assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']"