Merge branch 'cyber-observables' of https://github.com/rpiazza/cti-python-stix2 into cyber-observables
commit
c20f640910
|
@ -4,11 +4,12 @@
|
||||||
|
|
||||||
from . import exceptions
|
from . import exceptions
|
||||||
from .bundle import Bundle
|
from .bundle import Bundle
|
||||||
from .observables import (URL, Artifact, AutonomousSystem, Directory,
|
from .observables import (URL, ArchiveExt, Artifact, AutonomousSystem,
|
||||||
DomainName, EmailAddress, EmailMessage, EmailMIMEComponent, File,
|
Directory, DomainName, EmailAddress, EmailMessage,
|
||||||
IPv4Address, IPv6Address, MACAddress, Mutex,
|
EmailMIMEComponent, File, IPv4Address, IPv6Address,
|
||||||
NetworkTraffic, Process, Software, UserAccount,
|
MACAddress, Mutex, NetworkTraffic, Process, Software,
|
||||||
WindowsRegistryKey, WindowsRegistryValueType, X509Certificate)
|
UserAccount, WindowsRegistryKey,
|
||||||
|
WindowsRegistryValueType, X509Certificate)
|
||||||
from .other import (ExternalReference, GranularMarking, KillChainPhase,
|
from .other import (ExternalReference, GranularMarking, KillChainPhase,
|
||||||
MarkingDefinition, StatementMarking, TLPMarking)
|
MarkingDefinition, StatementMarking, TLPMarking)
|
||||||
from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator,
|
from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator,
|
||||||
|
@ -56,6 +57,14 @@ OBJ_MAP_OBSERVABLE = {
|
||||||
'x509-certificate': X509Certificate,
|
'x509-certificate': X509Certificate,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
EXT_MAP_FILE = {
|
||||||
|
'archive-ext': ArchiveExt,
|
||||||
|
}
|
||||||
|
|
||||||
|
EXT_MAP = {
|
||||||
|
'file': EXT_MAP_FILE,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def parse(data):
|
def parse(data):
|
||||||
"""Deserialize a string or file-like object into a STIX object"""
|
"""Deserialize a string or file-like object into a STIX object"""
|
||||||
|
@ -89,6 +98,14 @@ def parse_observable(data, _valid_refs):
|
||||||
try:
|
try:
|
||||||
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
|
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# TODO handle custom objects
|
# TODO handle custom observable objects
|
||||||
raise ValueError("Can't parse unknown object type '%s'!" % obj['type'])
|
raise ValueError("Can't parse unknown object type '%s'!" % obj['type'])
|
||||||
|
|
||||||
|
if 'extensions' in obj and obj['type'] in EXT_MAP:
|
||||||
|
for name, ext in obj['extensions'].items():
|
||||||
|
if name not in EXT_MAP[obj['type']]:
|
||||||
|
raise ValueError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
|
||||||
|
ext_class = EXT_MAP[obj['type']][name]
|
||||||
|
obj['extensions'][name] = ext_class(**obj['extensions'][name])
|
||||||
|
|
||||||
return obj_class(**obj)
|
return obj_class(**obj)
|
||||||
|
|
|
@ -7,7 +7,7 @@ and do not have a '_type' attribute.
|
||||||
|
|
||||||
from .base import _Observable, _STIXBase
|
from .base import _Observable, _STIXBase
|
||||||
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
|
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
|
||||||
EmbeddedObjectProperty, EnumProperty, HashesProperty,
|
EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, HashesProperty,
|
||||||
HexProperty, IntegerProperty, ListProperty,
|
HexProperty, IntegerProperty, ListProperty,
|
||||||
ObjectReferenceProperty, StringProperty,
|
ObjectReferenceProperty, StringProperty,
|
||||||
TimestampProperty, TypeProperty)
|
TimestampProperty, TypeProperty)
|
||||||
|
@ -111,11 +111,19 @@ class EmailMessage(_Observable):
|
||||||
# self._dependency(["is_multipart"], ["body"], [False])
|
# self._dependency(["is_multipart"], ["body"], [False])
|
||||||
|
|
||||||
|
|
||||||
|
class ArchiveExt(_STIXBase):
|
||||||
|
_properties = {
|
||||||
|
'contains_refs': ListProperty(ObjectReferenceProperty, required=True),
|
||||||
|
'version': StringProperty(),
|
||||||
|
'comment': StringProperty(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class File(_Observable):
|
class File(_Observable):
|
||||||
_type = 'file'
|
_type = 'file'
|
||||||
_properties = {
|
_properties = {
|
||||||
'type': TypeProperty(_type),
|
'type': TypeProperty(_type),
|
||||||
# extensions
|
'extensions': ExtensionsProperty(),
|
||||||
'hashes': HashesProperty(),
|
'hashes': HashesProperty(),
|
||||||
'size': IntegerProperty(),
|
'size': IntegerProperty(),
|
||||||
'name': StringProperty(),
|
'name': StringProperty(),
|
||||||
|
|
|
@ -376,3 +376,7 @@ class EnumProperty(StringProperty):
|
||||||
if value not in self.allowed:
|
if value not in self.allowed:
|
||||||
raise ValueError("value '%s' is not valid for this enumeration." % value)
|
raise ValueError("value '%s' is not valid for this enumeration." % value)
|
||||||
return self.string_type(value)
|
return self.string_type(value)
|
||||||
|
|
||||||
|
|
||||||
|
class ExtensionsProperty(DictionaryProperty):
|
||||||
|
pass
|
||||||
|
|
|
@ -281,6 +281,50 @@ def test_parse_email_message(data):
|
||||||
assert odata.body_multipart[0].content_disposition == "inline"
|
assert odata.body_multipart[0].content_disposition == "inline"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("data", [
|
||||||
|
""""0": {
|
||||||
|
"type": "file",
|
||||||
|
"hashes": {
|
||||||
|
"SHA-256": "ceafbfd424be2ca4a5f0402cae090dda2fb0526cf521b60b60077c0f622b285a"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"1": {
|
||||||
|
"type": "file",
|
||||||
|
"hashes": {
|
||||||
|
"SHA-256": "19c549ec2628b989382f6b280cbd7bb836a0b461332c0fe53511ce7d584b89d3"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"2": {
|
||||||
|
"type": "file",
|
||||||
|
"hashes": {
|
||||||
|
"SHA-256": "0969de02ecf8a5f003e3f6d063d848c8a193aada092623f8ce408c15bcb5f038"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"3": {
|
||||||
|
"type": "file",
|
||||||
|
"name": "foo.zip",
|
||||||
|
"hashes": {
|
||||||
|
"SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f"
|
||||||
|
},
|
||||||
|
"mime_type": "application/zip",
|
||||||
|
"extensions": {
|
||||||
|
"archive-ext": {
|
||||||
|
"contains_refs": [
|
||||||
|
"0",
|
||||||
|
"1",
|
||||||
|
"2"
|
||||||
|
],
|
||||||
|
"version": "5.0"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}""",
|
||||||
|
])
|
||||||
|
def test_parse_file_archive(data):
|
||||||
|
odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED)
|
||||||
|
odata = stix2.parse(odata_str)
|
||||||
|
assert odata.objects["3"].extensions['archive-ext'].version == "5.0"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("data", [
|
@pytest.mark.parametrize("data", [
|
||||||
"""
|
"""
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue