Add ExtensionsProperty and ArchiveExt

stix2.1
clenk 2017-05-12 11:22:23 -04:00
parent 13245d28ce
commit 0568a0e671
4 changed files with 77 additions and 5 deletions

View File

@ -4,8 +4,8 @@
from . import exceptions from . import exceptions
from .bundle import Bundle from .bundle import Bundle
from .observables import (URL, Artifact, AutonomousSystem, Directory, from .observables import (URL, ArchiveExt, Artifact, AutonomousSystem,
DomainName, EmailAddress, EmailMessage, Directory, DomainName, EmailAddress, EmailMessage,
EmailMIMEComponent, File, IPv4Address, IPv6Address, EmailMIMEComponent, File, IPv4Address, IPv6Address,
MACAddress, Mutex, NetworkTraffic, Process, Software, MACAddress, Mutex, NetworkTraffic, Process, Software,
UserAccount, WindowsRegistryKey, UserAccount, WindowsRegistryKey,
@ -57,6 +57,14 @@ OBJ_MAP_OBSERVABLE = {
'x509-certificate': X509Certificate, 'x509-certificate': X509Certificate,
} }
EXT_MAP_FILE = {
'archive-ext': ArchiveExt,
}
EXT_MAP = {
'file': EXT_MAP_FILE,
}
def parse(data): def parse(data):
"""Deserialize a string or file-like object into a STIX object""" """Deserialize a string or file-like object into a STIX object"""
@ -90,6 +98,14 @@ def parse_observable(data, _valid_refs):
try: try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']] obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError: except KeyError:
# TODO handle custom objects # TODO handle custom observable objects
raise ValueError("Can't parse unknown object type '%s'!" % obj['type']) raise ValueError("Can't parse unknown object type '%s'!" % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
if name not in EXT_MAP[obj['type']]:
raise ValueError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
ext_class = EXT_MAP[obj['type']][name]
obj['extensions'][name] = ext_class(**obj['extensions'][name])
return obj_class(**obj) return obj_class(**obj)

View File

@ -8,7 +8,7 @@ and do not have a '_type' attribute.
from .base import _Observable, _STIXBase from .base import _Observable, _STIXBase
from .exceptions import ObjectConstraintError from .exceptions import ObjectConstraintError
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, HashesProperty, EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, HashesProperty,
HexProperty, IntegerProperty, ListProperty, HexProperty, IntegerProperty, ListProperty,
ObjectReferenceProperty, Property, StringProperty, ObjectReferenceProperty, Property, StringProperty,
TimestampProperty, TypeProperty) TimestampProperty, TypeProperty)
@ -98,11 +98,19 @@ class EmailMessage(_Observable):
} }
class ArchiveExt(_STIXBase):
_properties = {
'contains_refs': ListProperty(ObjectReferenceProperty, required=True),
'version': StringProperty(),
'comment': StringProperty(),
}
class File(_Observable): class File(_Observable):
_type = 'file' _type = 'file'
_properties = { _properties = {
'type': TypeProperty(_type), 'type': TypeProperty(_type),
# extensions 'extensions': ExtensionsProperty(),
'hashes': HashesProperty(), 'hashes': HashesProperty(),
'size': IntegerProperty(), 'size': IntegerProperty(),
'name': StringProperty(), 'name': StringProperty(),

View File

@ -376,3 +376,7 @@ class EnumProperty(StringProperty):
if value not in self.allowed: if value not in self.allowed:
raise ValueError("value '%s' is not valid for this enumeration." % value) raise ValueError("value '%s' is not valid for this enumeration." % value)
return self.string_type(value) return self.string_type(value)
class ExtensionsProperty(DictionaryProperty):
pass

View File

@ -278,6 +278,50 @@ def test_parse_email_message(data):
assert odata.body_multipart[0].content_disposition == "inline" assert odata.body_multipart[0].content_disposition == "inline"
@pytest.mark.parametrize("data", [
""""0": {
"type": "file",
"hashes": {
"SHA-256": "ceafbfd424be2ca4a5f0402cae090dda2fb0526cf521b60b60077c0f622b285a"
}
},
"1": {
"type": "file",
"hashes": {
"SHA-256": "19c549ec2628b989382f6b280cbd7bb836a0b461332c0fe53511ce7d584b89d3"
}
},
"2": {
"type": "file",
"hashes": {
"SHA-256": "0969de02ecf8a5f003e3f6d063d848c8a193aada092623f8ce408c15bcb5f038"
}
},
"3": {
"type": "file",
"name": "foo.zip",
"hashes": {
"SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f"
},
"mime_type": "application/zip",
"extensions": {
"archive-ext": {
"contains_refs": [
"0",
"1",
"2"
],
"version": "5.0"
}
}
}""",
])
def test_parse_file_address(data):
odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED)
odata = stix2.parse(odata_str)
assert odata.objects["3"].extensions['archive-ext'].version == "5.0"
# creating cyber observables directly # creating cyber observables directly
def test_directory_example(): def test_directory_example():