Merge pull request #3 from oasis-open/parse-cyber-observables

Parse cyber observables
stix2.1
Rich Piazza 2017-05-09 12:45:10 -04:00 committed by GitHub
commit 5de2cfe6a8
6 changed files with 128 additions and 14 deletions

View File

@ -3,8 +3,9 @@
# flake8: noqa # flake8: noqa
from .bundle import Bundle from .bundle import Bundle
from .observables import Artifact, AutonomousSystem, Directory, DomainName, EmailAddress, File, IPv4Address, \ from .observables import Artifact, AutonomousSystem, Directory, DomainName, \
IPv6Address, MACAddress, Mutex, NetworkTraffic, Process, Software, URL, UserAccount, WindowsRegistryKey, \ EmailAddress, EmailMessage, File, IPv4Address, IPv6Address, MACAddress, \
Mutex, NetworkTraffic, Process, Software, URL, UserAccount, WindowsRegistryKey, \
X509Certificate X509Certificate
from .other import ExternalReference, KillChainPhase, MarkingDefinition, \ from .other import ExternalReference, KillChainPhase, MarkingDefinition, \
GranularMarking, StatementMarking, TLPMarking GranularMarking, StatementMarking, TLPMarking
@ -40,6 +41,7 @@ OBJ_MAP_OBSERVABLE = {
'directory': Directory, 'directory': Directory,
'domain-name': DomainName, 'domain-name': DomainName,
'email-address': EmailAddress, 'email-address': EmailAddress,
'email-message': EmailMessage,
'file': File, 'file': File,
'ipv4-addr': IPv4Address, 'ipv4-addr': IPv4Address,
'ipv6-addr': IPv6Address, 'ipv6-addr': IPv6Address,

View File

@ -161,7 +161,7 @@ class Observable(_STIXBase):
ref = kwargs[prop_name] ref = kwargs[prop_name]
if ref not in self._STIXBase__valid_refs: if ref not in self._STIXBase__valid_refs:
raise InvalidObjRefError(self.__class__, prop_name, "'%s' is not a valid object in local scope" % ref) raise InvalidObjRefError(self.__class__, prop_name, "'%s' is not a valid object in local scope" % ref)
if prop_name.endswith('_refs') and prop_name in kwargs: elif prop_name.endswith('_refs') and prop_name in kwargs:
for ref in kwargs[prop_name]: for ref in kwargs[prop_name]:
if ref not in self._STIXBase__valid_refs: if ref not in self._STIXBase__valid_refs:
raise InvalidObjRefError(self.__class__, prop_name, "'%s' is not a valid object in local scope" % ref) raise InvalidObjRefError(self.__class__, prop_name, "'%s' is not a valid object in local scope" % ref)

View File

@ -1,12 +1,15 @@
"""STIX 2.0 Cyber Observable Objects""" """STIX 2.0 Cyber Observable Objects
Embedded observable object types, such as Email MIME Component, which is
embedded in Email Message objects, inherit from _STIXBase instead of Observable
and do not have a '_type' attribute.
"""
from .base import _STIXBase, Observable from .base import _STIXBase, Observable
# from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, from .properties import BinaryProperty, BooleanProperty, DictionaryProperty, \
# HashesProperty, HexProperty, IDProperty, EmbeddedObjectProperty, HashesProperty, HexProperty, IntegerProperty, \
# IntegerProperty, ListProperty, ReferenceProperty, ListProperty, ObjectReferenceProperty, Property, StringProperty, \
# StringProperty, TimestampProperty, TypeProperty) TimestampProperty, TypeProperty
from .properties import BinaryProperty, BooleanProperty, DictionaryProperty, HashesProperty, HexProperty, \
IntegerProperty, ListProperty, ObjectReferenceProperty, Property, StringProperty, TimestampProperty, TypeProperty
class Artifact(Observable): class Artifact(Observable):
@ -63,6 +66,36 @@ class EmailAddress(Observable):
} }
class EmailMIMEComponent(_STIXBase):
_properties = {
'body': StringProperty(),
'body_raw_ref': ObjectReferenceProperty(),
'content_type': StringProperty(),
'content_disposition': StringProperty(),
}
class EmailMessage(Observable):
_type = 'email-message'
_properties = {
'type': TypeProperty(_type),
'is_multipart': BooleanProperty(required=True),
'date': TimestampProperty(),
'content_type': StringProperty(),
'from_ref': ObjectReferenceProperty(),
'sender_ref': ObjectReferenceProperty(),
'to_refs': ListProperty(ObjectReferenceProperty),
'cc_refs': ListProperty(ObjectReferenceProperty),
'bcc_refs': ListProperty(ObjectReferenceProperty),
'subject': StringProperty(),
'received_lines': ListProperty(StringProperty),
'additional_header_fields': DictionaryProperty(),
'body': StringProperty(),
'body_multipart': ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent)),
'raw_email_ref': ObjectReferenceProperty(),
}
class File(Observable): class File(Observable):
_type = 'file' _type = 'file'
_properties = { _properties = {

View File

@ -115,10 +115,15 @@ class ListProperty(Property):
# TODO Should we raise an error here? # TODO Should we raise an error here?
valid = item valid = item
if isinstance(valid, collections.Mapping): if type(self.contained) is EmbeddedObjectProperty:
result.append(self.contained(**valid)) obj_type = self.contained.type
else: else:
result.append(self.contained(valid)) obj_type = self.contained
if isinstance(valid, collections.Mapping):
result.append(obj_type(**valid))
else:
result.append(obj_type(valid))
# STIX spec forbids empty lists # STIX spec forbids empty lists
if len(result) < 1: if len(result) < 1:
@ -339,3 +344,16 @@ class SelectorProperty(Property):
class ObjectReferenceProperty(StringProperty): class ObjectReferenceProperty(StringProperty):
pass pass
class EmbeddedObjectProperty(Property):
def __init__(self, type, required=False):
self.type = type
super(EmbeddedObjectProperty, self).__init__(required, type=type)
def clean(self, value):
if type(value) is dict:
value = self.type(**value)
elif not isinstance(value, self.type):
raise ValueError("must be of type %s." % self.type.__name__)
return value

View File

@ -230,6 +230,52 @@ def test_parse_email_address(data):
stix2.parse(odata_str) stix2.parse(odata_str)
@pytest.mark.parametrize("data", [
"""
{
"type": "email-message",
"is_multipart": true,
"content_type": "multipart/mixed",
"date": "2016-06-19T14:20:40.000Z",
"from_ref": "1",
"to_refs": [
"2"
],
"cc_refs": [
"3"
],
"subject": "Check out this picture of a cat!",
"additional_header_fields": {
"Content-Disposition": "inline",
"X-Mailer": "Mutt/1.5.23",
"X-Originating-IP": "198.51.100.3"
},
"body_multipart": [
{
"content_type": "text/plain; charset=utf-8",
"content_disposition": "inline",
"body": "Cats are funny!"
},
{
"content_type": "image/png",
"content_disposition": "attachment; filename=\\"tabby.png\\"",
"body_raw_ref": "4"
},
{
"content_type": "application/zip",
"content_disposition": "attachment; filename=\\"tabby_pics.zip\\"",
"body_raw_ref": "5"
}
]
}
"""
])
def test_parse_email_message(data):
odata = stix2.parse_observable(data, [str(i) for i in range(1, 6)])
assert odata.type == "email-message"
assert odata.body_multipart[0].content_disposition == "inline"
# creating cyber observables directly # creating cyber observables directly
def test_directory_example(): def test_directory_example():

View File

@ -1,8 +1,10 @@
import pytest import pytest
from stix2.exceptions import DictionaryKeyError from stix2.exceptions import DictionaryKeyError
from stix2.observables import EmailMIMEComponent
from stix2.properties import (BinaryProperty, BooleanProperty, from stix2.properties import (BinaryProperty, BooleanProperty,
DictionaryProperty, HashesProperty, HexProperty, DictionaryProperty, EmbeddedObjectProperty,
HashesProperty, HexProperty,
IDProperty, IntegerProperty, ListProperty, IDProperty, IntegerProperty, ListProperty,
Property, ReferenceProperty, StringProperty, Property, ReferenceProperty, StringProperty,
TimestampProperty, TypeProperty) TimestampProperty, TypeProperty)
@ -232,3 +234,16 @@ def test_hashes_property_invalid(value):
with pytest.raises(ValueError): with pytest.raises(ValueError):
hash_prop.clean(value) hash_prop.clean(value)
def test_embedded_property():
emb_prop = EmbeddedObjectProperty(type=EmailMIMEComponent)
mime = EmailMIMEComponent(
content_type="text/plain; charset=utf-8",
content_disposition="inline",
body="Cats are funny!"
)
assert emb_prop.clean(mime)
with pytest.raises(ValueError):
emb_prop.clean("string")