Merge pull request #8 from rpiazza/cyber-observables

Add interproperty constraints
stix2.1
Chris Lenk 2017-05-12 11:32:57 -04:00 committed by GitHub
commit 84cdc1c204
5 changed files with 351 additions and 30 deletions

View File

@ -5,11 +5,10 @@
from . import exceptions
from .bundle import Bundle
from .observables import (URL, Artifact, AutonomousSystem, Directory,
DomainName, EmailAddress, EmailMessage,
EmailMIMEComponent, File, IPv4Address, IPv6Address,
MACAddress, Mutex, NetworkTraffic, Process, Software,
UserAccount, WindowsRegistryKey,
WindowsRegistryValueType, X509Certificate)
DomainName, EmailAddress, EmailMessage, EmailMIMEComponent, File,
IPv4Address, IPv6Address, MACAddress, Mutex,
NetworkTraffic, Process, Software, UserAccount,
WindowsRegistryKey, WindowsRegistryValueType, X509Certificate)
from .other import (ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking)
from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator,

View File

@ -5,9 +5,9 @@ import copy
import datetime as dt
import json
from .exceptions import (ExtraFieldsError, ImmutableError, InvalidObjRefError,
InvalidValueError, MissingFieldsError, RevokeError,
UnmodifiablePropertyError)
from .exceptions import (AtLeastOnePropertyError, DependentPropertiestError, ExtraFieldsError, ImmutableError,
InvalidObjRefError, InvalidValueError, MissingFieldsError, MutuallyExclusivePropertiesError,
RevokeError, UnmodifiablePropertyError)
from .utils import NOW, format_datetime, get_timestamp, parse_into_datetime
__all__ = ['STIXJSONEncoder', '_STIXBase']
@ -47,6 +47,36 @@ class _STIXBase(collections.Mapping):
except ValueError as exc:
raise InvalidValueError(self.__class__, prop_name, reason=str(exc))
# interproperty constraint methods
def _check_mutually_exclusive_properties(self, list_of_properties, at_least_one=True):
count = 0
current_properties = self.properties_populated()
for x in list_of_properties:
if x in current_properties:
count += 1
# at_least_one allows for xor to be checked
if count > 1 or (at_least_one and count == 0):
raise MutuallyExclusivePropertiesError(self.__class__, list_of_properties)
def _check_at_least_one_property(self, list_of_properties):
current_properties = self.properties_populated()
for x in list_of_properties:
if x in current_properties:
return
raise AtLeastOnePropertyError(self.__class__, list_of_properties)
def _check_properties_dependency(self, list_of_properties, list_of_dependent_properties, values=[]):
failed_dependency_pairs = []
current_properties = self.properties_populated()
for p in list_of_properties:
v = values.pop() if values else None
for dp in list_of_dependent_properties:
if dp in current_properties and (p not in current_properties or (v and not current_properties(p) == v)):
failed_dependency_pairs.append((p, dp))
if failed_dependency_pairs:
raise DependentPropertiestError(self.__class__, failed_dependency_pairs)
def _check_object_constaints(self):
if self.granular_markings:
for m in self.granular_markings:

View File

@ -90,16 +90,44 @@ class UnmodifiablePropertyError(STIXError, ValueError):
return msg.format(", ".join(self.unchangable_properties))
class ObjectConstraintError(STIXError, TypeError):
"""Violating some interproperty constraint of a STIX object type."""
class MutuallyExclusivePropertiesError(STIXError, TypeError):
"""Violating interproperty mutually exclusive constraint of a STIX object type."""
def __init__(self, cls, fields):
super(ObjectConstraintError, self).__init__()
super(MutuallyExclusivePropertiesError, self).__init__()
self.cls = cls
self.fields = sorted(list(fields))
def __str__(self):
msg = "The field(s) for {0}: ({1}) are not consistent."
msg = "The field(s) for {0}: ({1}) are mutually exclusive."
return msg.format(self.cls.__name__,
", ".join(x for x in self.fields))
class DependentPropertiestError(STIXError, TypeError):
"""Violating interproperty dependency constraint of a STIX object type."""
def __init__(self, cls, dependencies):
super(DependentPropertiestError, self).__init__()
self.cls = cls
self.dependencies = dependencies
def __str__(self):
msg = "The property dependencies for {0}: ({1}) are not met."
return msg.format(self.cls.__name__,
", ".join(x for x in self.dependencies))
class AtLeastOnePropertyError(STIXError, TypeError):
"""Violating a constraint of a STIX object type that at least one of the given properties must be populated."""
def __init__(self, cls, fields):
super(AtLeastOnePropertyError, self).__init__()
self.cls = cls
self.fields = sorted(list(fields))
def __str__(self):
msg = "At least one of the field(s) for {0}: ({1}) must be populated."
return msg.format(self.cls.__name__,
", ".join(x for x in self.fields))

View File

@ -6,11 +6,10 @@ and do not have a '_type' attribute.
"""
from .base import _Observable, _STIXBase
from .exceptions import ObjectConstraintError
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, HashesProperty,
HexProperty, IntegerProperty, ListProperty,
ObjectReferenceProperty, Property, StringProperty,
ObjectReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
@ -24,6 +23,11 @@ class Artifact(_Observable):
'hashes': HashesProperty(),
}
def _check_object_constaints(self):
super(Artifact, self)._check_object_constaints()
self._check_mutually_exclusive_properties(["payload_bin", "url"])
self._check_properties_dependency(["hashes"], ["url"])
class AutonomousSystem(_Observable):
_type = 'autonomous-system'
@ -76,6 +80,10 @@ class EmailMIMEComponent(_STIXBase):
'content_disposition': StringProperty(),
}
def _check_object_constaints(self):
super(EmailMIMEComponent, self)._check_object_constaints()
self._check_at_least_one_property(["body", "body_raw_ref"])
class EmailMessage(_Observable):
_type = 'email-message'
@ -97,6 +105,11 @@ class EmailMessage(_Observable):
'raw_email_ref': ObjectReferenceProperty(),
}
def _check_object_constaints(self):
super(EmailMessage, self)._check_object_constaints()
self._check_properties_dependency(["is_multipart"], ["body_multipart"])
# self._dependency(["is_multipart"], ["body"], [False])
class File(_Observable):
_type = 'file'
@ -123,15 +136,8 @@ class File(_Observable):
def _check_object_constaints(self):
super(File, self)._check_object_constaints()
illegal_properties = []
current_properties = self.properties_populated()
if not self.is_encrypted:
for p in ["encryption_algorithm", "decryption_key"]:
if p in current_properties:
illegal_properties.append(p)
if illegal_properties:
illegal_properties.append("is_encrypted")
raise ObjectConstraintError(self.__class__, illegal_properties)
self._check_properties_dependency(["is_encrypted"], ["encryption_algorithm", "decryption_key"])
self._check_at_least_one_property(["hashes", "name"])
class IPv4Address(_Observable):
@ -182,7 +188,7 @@ class NetworkTraffic(_Observable):
'dst_ref': ObjectReferenceProperty(),
'src_port': IntegerProperty(),
'dst_port': IntegerProperty(),
'protocols': ListProperty(StringProperty),
'protocols': ListProperty(StringProperty, required=True),
'src_byte_count': IntegerProperty(),
'dst_byte_count': IntegerProperty(),
'src_packets': IntegerProperty(),
@ -194,6 +200,10 @@ class NetworkTraffic(_Observable):
'encapsulates_by_ref': ObjectReferenceProperty(),
}
def _check_object_constaints(self):
super(NetworkTraffic, self)._check_object_constaints()
self._check_at_least_one_property(["src_ref", "dst_ref"])
class Process(_Observable):
_type = 'process'
@ -299,6 +309,28 @@ class WindowsRegistryKey(_Observable):
return self._inner['values']
class X509V3ExtenstionsType(_STIXBase):
_type = 'x509-v3-extensions-type'
_properties = {
'basic_constraints': StringProperty(),
'name_constraints': StringProperty(),
'policy_constraints': StringProperty(),
'key_usage': StringProperty(),
'extended_key_usage': StringProperty(),
'subject_key_identifier': StringProperty(),
'authority_key_identifier': StringProperty(),
'subject_alternative_name': StringProperty(),
'issuer_alternative_name': StringProperty(),
'subject_directory_attributes': StringProperty(),
'crl_distribution_points': StringProperty(),
'inhibit_any_policy': StringProperty(),
'private_key_usage_period_not_before': TimestampProperty(),
'private_key_usage_period_not_after': TimestampProperty(),
'certificate_policies': StringProperty(),
'policy_mappings': StringProperty(),
}
class X509Certificate(_Observable):
_type = 'x509-certificate'
_properties = {
@ -315,5 +347,5 @@ class X509Certificate(_Observable):
'subject_public_key_algorithm': StringProperty(),
'subject_public_key_modulus': StringProperty(),
'subject_public_key_exponent': IntegerProperty(),
'x509_v3_extensions': Property(),
'x509_v3_extensions': EmbeddedObjectProperty(type=X509V3ExtenstionsType),
}

View File

@ -20,6 +20,7 @@ EXPECTED = """{
"number_observed": 50,
"objects": {
"0": {
"name": "foo.exe",
"type": "file"
}
},
@ -38,7 +39,8 @@ def test_observed_data_example():
number_observed=50,
objects={
"0": {
"type": "file",
"name": "foo.exe",
"type": "file"
},
},
)
@ -82,8 +84,8 @@ def test_observed_data_example_with_refs():
number_observed=50,
objects={
"0": {
"type": "file",
"name": "foo.exe"
"name": "foo.exe",
"type": "file"
},
"1": {
"type": "directory",
@ -137,6 +139,7 @@ def test_observed_data_example_with_bad_refs():
"number_observed": 50,
"objects": {
"0": {
"name": "foo.exe",
"type": "file"
}
}
@ -278,8 +281,192 @@ def test_parse_email_message(data):
assert odata.body_multipart[0].content_disposition == "inline"
@pytest.mark.parametrize("data", [
"""
{
"type": "email-message",
"is_multipart": true,
"content_type": "multipart/mixed",
"date": "2016-06-19T14:20:40.000Z",
"from_ref": "1",
"to_refs": [
"2"
],
"cc_refs": [
"3"
],
"subject": "Check out this picture of a cat!",
"additional_header_fields": {
"Content-Disposition": "inline",
"X-Mailer": "Mutt/1.5.23",
"X-Originating-IP": "198.51.100.3"
},
"body_multipart": [
{
"content_type": "text/plain; charset=utf-8",
"content_disposition": "inline",
"body": "Cats are funny!"
},
{
"content_type": "image/png",
"content_disposition": "attachment; filename=\\"tabby.png\\""
},
{
"content_type": "application/zip",
"content_disposition": "attachment; filename=\\"tabby_pics.zip\\"",
"body_raw_ref": "5"
}
]
}
"""
])
def test_parse_email_message_with_at_least_one_error(data):
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
stix2.parse_observable(data, [str(i) for i in range(1, 6)])
assert excinfo.value.cls == stix2.EmailMIMEComponent
assert excinfo.value.fields == ["body", "body_raw_ref"]
@pytest.mark.parametrize("data", [
"""
{
"type": "network-traffic",
"src_ref": "0",
"dst_ref": "1",
"protocols": [
"tcp"
]
}
"""
])
def test_parse_basic_tcp_traffic(data):
odata = stix2.parse_observable(data, ["0", "1"])
assert odata.type == "network-traffic"
assert odata.src_ref == "0"
assert odata.dst_ref == "1"
assert odata.protocols == ["tcp"]
@pytest.mark.parametrize("data", [
"""
{
"type": "network-traffic",
"src_port": 2487,
"dst_port": 1723,
"protocols": [
"ipv4",
"pptp"
],
"src_byte_count": 35779,
"dst_byte_count": 935750,
"encapsulates_refs": [
"4"
]
}
"""
])
def test_parse_basic_tcp_traffic_with_error(data):
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
stix2.parse_observable(data, ["4"])
assert excinfo.value.cls == stix2.NetworkTraffic
assert excinfo.value.fields == ["dst_ref", "src_ref"]
EXPECTED_PROCESS_OD = """{
"created": "2016-04-06T19:58:16Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
"first_observed": "2015-12-21T19:00:00Z",
"id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
"last_observed": "2015-12-21T19:00:00Z",
"modified": "2016-04-06T19:58:16Z",
"number_observed": 50,
"objects": {
"0": {
"type": "file",
"hashes": {
"SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100fSHA"
},
},
"1": {
"type": "process",
"pid": 1221,
"name": "gedit-bin",
"created": "2016-01-20T14:11:25.55Z",
"arguments" :[
"--new-window"
],
"binary_ref": "0"
}
},
"type": "observed-data"
}"""
def test_observed_data_with_process_example():
observed_data = stix2.ObservedData(
id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
created="2016-04-06T19:58:16Z",
modified="2016-04-06T19:58:16Z",
first_observed="2015-12-21T19:00:00Z",
last_observed="2015-12-21T19:00:00Z",
number_observed=50,
objects={
"0": {
"type": "file",
"hashes": {
"SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f"
},
},
"1": {
"type": "process",
"pid": 1221,
"name": "gedit-bin",
"created": "2016-01-20T14:11:25.55Z",
"arguments": [
"--new-window"
],
"binary_ref": "0"
}
})
assert observed_data.objects["0"].type == "file"
assert observed_data.objects["0"].hashes["SHA-256"] == "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f"
assert observed_data.objects["1"].type == "process"
assert observed_data.objects["1"].pid == 1221
assert observed_data.objects["1"].name == "gedit-bin"
assert observed_data.objects["1"].arguments[0] == "--new-window"
# creating cyber observables directly
def test_artifact_example():
art = stix2.Artifact(mime_type="image/jpeg",
url="https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg",
hashes={
"MD5": "6826f9a05da08134006557758bb3afbb"
})
assert art.mime_type == "image/jpeg"
assert art.url == "https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg"
assert art.hashes["MD5"] == "6826f9a05da08134006557758bb3afbb"
def test_artifact_mutual_exclusion_error():
with pytest.raises(stix2.exceptions.MutuallyExclusivePropertiesError) as excinfo:
stix2.Artifact(mime_type="image/jpeg",
url="https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg",
hashes={
"MD5": "6826f9a05da08134006557758bb3afbb"
},
payload_bin="VBORw0KGgoAAAANSUhEUgAAADI==")
assert excinfo.value.cls == stix2.Artifact
assert excinfo.value.fields == ["payload_bin", "url"]
def test_directory_example():
dir = stix2.Directory(_valid_refs=["1"],
path='/usr/lib',
@ -346,14 +533,14 @@ def test_file_example():
def test_file_example_encryption_error():
with pytest.raises(stix2.exceptions.ObjectConstraintError) as excinfo:
with pytest.raises(stix2.exceptions.DependentPropertiestError) as excinfo:
stix2.File(name="qwerty.dll",
is_encrypted=False,
encryption_algorithm="AES128-CBC"
)
assert excinfo.value.cls == stix2.File
assert excinfo.value.fields == ["encryption_algorithm", "is_encrypted"]
assert excinfo.value.dependencies == [("is_encrypted", "encryption_algorithm")]
def test_ip4_address_example():
@ -401,6 +588,39 @@ def test_software_example():
assert s.vendor == "Microsoft"
def test_url_example():
s = stix2.URL(value="https://example.com/research/index.html")
assert s.type == "url"
assert s.value == "https://example.com/research/index.html"
def test_user_account_example():
a = stix2.UserAccount(user_id="1001",
account_login="jdoe",
account_type="unix",
display_name="John Doe",
is_service_account=False,
is_privileged=False,
can_escalate_privs=True,
account_created="2016-01-20T12:31:12Z",
password_last_changed="2016-01-20T14:27:43Z",
account_first_login="2016-01-20T14:26:07Z",
account_last_login="2016-07-22T16:08:28Z")
assert a.user_id == "1001"
assert a.account_login == "jdoe"
assert a.account_type == "unix"
assert a.display_name == "John Doe"
assert not a.is_service_account
assert not a.is_privileged
assert a.can_escalate_privs
assert a.account_created == dt.datetime(2016, 1, 20, 12, 31, 12, tzinfo=pytz.utc)
assert a.password_last_changed == dt.datetime(2016, 1, 20, 14, 27, 43, tzinfo=pytz.utc)
assert a.account_first_login == dt.datetime(2016, 1, 20, 14, 26, 7, tzinfo=pytz.utc)
assert a.account_last_login == dt.datetime(2016, 7, 22, 16, 8, 28, tzinfo=pytz.utc)
def test_windows_registry_key_example():
with pytest.raises(ValueError):
v = stix2.WindowsRegistryValueType(name="Foo",
@ -416,3 +636,15 @@ def test_windows_registry_key_example():
assert w.values[0].name == "Foo"
assert w.values[0].data == "qwerty"
assert w.values[0].data_type == "REG_SZ"
def test_x509_certificate_example():
x509 = stix2.X509Certificate(
issuer="C=ZA, ST=Western Cape, L=Cape Town, O=Thawte Consulting cc, OU=Certification Services Division, CN=Thawte Server CA/emailAddress=server-certs@thawte.com", # noqa
validity_not_before="2016-03-12T12:00:00Z",
validity_not_after="2016-08-21T12:00:00Z",
subject="C=US, ST=Maryland, L=Pasadena, O=Brent Baccala, OU=FreeSoft, CN=www.freesoft.org/emailAddress=baccala@freesoft.org") # noqa
assert x509.type == "x509-certificate"
assert x509.issuer == "C=ZA, ST=Western Cape, L=Cape Town, O=Thawte Consulting cc, OU=Certification Services Division, CN=Thawte Server CA/emailAddress=server-certs@thawte.com" # noqa
assert x509.subject == "C=US, ST=Maryland, L=Pasadena, O=Brent Baccala, OU=FreeSoft, CN=www.freesoft.org/emailAddress=baccala@freesoft.org" # noqa