Add Artifact type
parent
c63ba8e447
commit
2c67b90638
|
@ -3,6 +3,7 @@
|
||||||
# flake8: noqa
|
# flake8: noqa
|
||||||
|
|
||||||
from .bundle import Bundle
|
from .bundle import Bundle
|
||||||
|
from .observables import Artifact, File
|
||||||
from .other import ExternalReference, KillChainPhase, MarkingDefinition, \
|
from .other import ExternalReference, KillChainPhase, MarkingDefinition, \
|
||||||
GranularMarking, StatementMarking, TLPMarking
|
GranularMarking, StatementMarking, TLPMarking
|
||||||
from .sdo import AttackPattern, Campaign, CourseOfAction, Identity, Indicator, \
|
from .sdo import AttackPattern, Campaign, CourseOfAction, Identity, Indicator, \
|
||||||
|
@ -31,8 +32,13 @@ OBJ_MAP = {
|
||||||
'vulnerability': Vulnerability,
|
'vulnerability': Vulnerability,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
OBJ_MAP_OBSERVABLE = {
|
||||||
|
'artifact': Artifact,
|
||||||
|
'file': File,
|
||||||
|
}
|
||||||
|
|
||||||
def parse(data):
|
|
||||||
|
def parse(data, observable=False):
|
||||||
"""Deserialize a string or file-like object into a STIX object"""
|
"""Deserialize a string or file-like object into a STIX object"""
|
||||||
|
|
||||||
obj = get_dict(data)
|
obj = get_dict(data)
|
||||||
|
@ -42,10 +48,13 @@ def parse(data):
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
obj_class = OBJ_MAP[obj['type']]
|
if observable:
|
||||||
return obj_class(**obj)
|
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
|
||||||
|
else:
|
||||||
|
obj_class = OBJ_MAP[obj['type']]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# TODO handle custom objects
|
# TODO handle custom objects
|
||||||
raise ValueError("Can't parse unknown object type!")
|
raise ValueError("Can't parse unknown object type '%s'!" % obj['type'])
|
||||||
|
return obj_class(**obj)
|
||||||
|
|
||||||
return obj
|
return obj
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
"""STIX 2.0 Cyber Observable Objects"""
|
||||||
|
|
||||||
|
from .base import Observable
|
||||||
|
# from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
|
||||||
|
# HashesProperty, HexProperty, IDProperty,
|
||||||
|
# IntegerProperty, ListProperty, ReferenceProperty,
|
||||||
|
# StringProperty, TimestampProperty, TypeProperty)
|
||||||
|
from .properties import BinaryProperty, HashesProperty, StringProperty, TypeProperty
|
||||||
|
|
||||||
|
|
||||||
|
class Artifact(Observable):
|
||||||
|
_type = 'artifact'
|
||||||
|
_properties = {
|
||||||
|
'type': TypeProperty(_type),
|
||||||
|
'mime_type': StringProperty(),
|
||||||
|
'payload_bin': BinaryProperty(),
|
||||||
|
'url': StringProperty(),
|
||||||
|
'hashes': HashesProperty(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class File(Observable):
|
||||||
|
_type = 'file'
|
||||||
|
_properties = {
|
||||||
|
'type': TypeProperty(_type),
|
||||||
|
}
|
|
@ -220,10 +220,14 @@ class ObservableProperty(Property):
|
||||||
|
|
||||||
def clean(self, value):
|
def clean(self, value):
|
||||||
dictified = dict(value)
|
dictified = dict(value)
|
||||||
for obj in dictified:
|
from .__init__ import parse # avoid circular import
|
||||||
if not issubclass(type(obj), Observable):
|
for key, obj in dictified.items():
|
||||||
|
parsed_obj = parse(obj, observable=True)
|
||||||
|
if not issubclass(type(parsed_obj), Observable):
|
||||||
raise ValueError("Objects in an observable property must be "
|
raise ValueError("Objects in an observable property must be "
|
||||||
"Cyber Observable Objects")
|
"Cyber Observable Objects")
|
||||||
|
dictified[key] = parsed_obj
|
||||||
|
|
||||||
return dictified
|
return dictified
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -3,9 +3,9 @@
|
||||||
from .base import _STIXBase
|
from .base import _STIXBase
|
||||||
from .common import COMMON_PROPERTIES
|
from .common import COMMON_PROPERTIES
|
||||||
from .other import KillChainPhase
|
from .other import KillChainPhase
|
||||||
from .properties import (IDProperty, IntegerProperty, ListProperty, Property,
|
from .properties import (IDProperty, IntegerProperty, ListProperty,
|
||||||
ReferenceProperty, StringProperty, TimestampProperty,
|
ObservableProperty, ReferenceProperty,
|
||||||
TypeProperty)
|
StringProperty, TimestampProperty, TypeProperty)
|
||||||
from .utils import NOW
|
from .utils import NOW
|
||||||
|
|
||||||
|
|
||||||
|
@ -126,7 +126,7 @@ class ObservedData(_STIXBase):
|
||||||
'first_observed': TimestampProperty(required=True),
|
'first_observed': TimestampProperty(required=True),
|
||||||
'last_observed': TimestampProperty(required=True),
|
'last_observed': TimestampProperty(required=True),
|
||||||
'number_observed': IntegerProperty(required=True),
|
'number_observed': IntegerProperty(required=True),
|
||||||
'objects': Property(),
|
'objects': ObservableProperty(),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
|
import re
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import pytz
|
import pytz
|
||||||
|
@ -70,6 +71,48 @@ def test_parse_observed_data(data):
|
||||||
assert odata.first_observed == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc)
|
assert odata.first_observed == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc)
|
||||||
assert odata.last_observed == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc)
|
assert odata.last_observed == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc)
|
||||||
assert odata.created_by_ref == "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff"
|
assert odata.created_by_ref == "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff"
|
||||||
# assert odata.objects["0"].type == "file" # TODO
|
assert odata.objects["0"].type == "file"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("data", [
|
||||||
|
""""0": {
|
||||||
|
"type": "artifact",
|
||||||
|
"mime_type": "image/jpeg",
|
||||||
|
"payload_bin": "VBORw0KGgoAAAANSUhEUgAAADI=="
|
||||||
|
}""",
|
||||||
|
""""0": {
|
||||||
|
"type": "artifact",
|
||||||
|
"mime_type": "image/jpeg",
|
||||||
|
"url": "https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg",
|
||||||
|
"hashes": {
|
||||||
|
"MD5": "6826f9a05da08134006557758bb3afbb"
|
||||||
|
}
|
||||||
|
}""",
|
||||||
|
])
|
||||||
|
def test_parse_artifact_valid(data):
|
||||||
|
odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED)
|
||||||
|
odata = stix2.parse(odata_str)
|
||||||
|
assert odata.objects["0"].type == "artifact"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("data", [
|
||||||
|
""""0": {
|
||||||
|
"type": "artifact",
|
||||||
|
"mime_type": "image/jpeg",
|
||||||
|
"payload_bin": "abcVBORw0KGgoAAAANSUhEUgAAADI=="
|
||||||
|
}""",
|
||||||
|
""""0": {
|
||||||
|
"type": "artifact",
|
||||||
|
"mime_type": "image/jpeg",
|
||||||
|
"url": "https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg",
|
||||||
|
"hashes": {
|
||||||
|
"MD5": "a"
|
||||||
|
}
|
||||||
|
}""",
|
||||||
|
])
|
||||||
|
def test_parse_artifact_invalid(data):
|
||||||
|
odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED)
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
stix2.parse(odata_str)
|
||||||
|
|
||||||
# TODO: Add other examples
|
# TODO: Add other examples
|
||||||
|
|
Loading…
Reference in New Issue