From 2c67b906385e0391b5f22286badc9610abf2512d Mon Sep 17 00:00:00 2001 From: clenk Date: Wed, 3 May 2017 17:35:33 -0400 Subject: [PATCH] Add Artifact type --- stix2/__init__.py | 17 +++++++++--- stix2/observables.py | 26 ++++++++++++++++++ stix2/properties.py | 8 ++++-- stix2/sdo.py | 8 +++--- stix2/test/test_observed_data.py | 45 +++++++++++++++++++++++++++++++- 5 files changed, 93 insertions(+), 11 deletions(-) create mode 100644 stix2/observables.py diff --git a/stix2/__init__.py b/stix2/__init__.py index 187d18a..1ec1711 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -3,6 +3,7 @@ # flake8: noqa from .bundle import Bundle +from .observables import Artifact, File from .other import ExternalReference, KillChainPhase, MarkingDefinition, \ GranularMarking, StatementMarking, TLPMarking from .sdo import AttackPattern, Campaign, CourseOfAction, Identity, Indicator, \ @@ -31,8 +32,13 @@ OBJ_MAP = { 'vulnerability': Vulnerability, } +OBJ_MAP_OBSERVABLE = { + 'artifact': Artifact, + 'file': File, +} -def parse(data): + +def parse(data, observable=False): """Deserialize a string or file-like object into a STIX object""" obj = get_dict(data) @@ -42,10 +48,13 @@ def parse(data): pass else: try: - obj_class = OBJ_MAP[obj['type']] - return obj_class(**obj) + if observable: + obj_class = OBJ_MAP_OBSERVABLE[obj['type']] + else: + obj_class = OBJ_MAP[obj['type']] except KeyError: # TODO handle custom objects - raise ValueError("Can't parse unknown object type!") + raise ValueError("Can't parse unknown object type '%s'!" % obj['type']) + return obj_class(**obj) return obj diff --git a/stix2/observables.py b/stix2/observables.py new file mode 100644 index 0000000..33eded2 --- /dev/null +++ b/stix2/observables.py @@ -0,0 +1,26 @@ +"""STIX 2.0 Cyber Observable Objects""" + +from .base import Observable +# from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, +# HashesProperty, HexProperty, IDProperty, +# IntegerProperty, ListProperty, ReferenceProperty, +# StringProperty, TimestampProperty, TypeProperty) +from .properties import BinaryProperty, HashesProperty, StringProperty, TypeProperty + + +class Artifact(Observable): + _type = 'artifact' + _properties = { + 'type': TypeProperty(_type), + 'mime_type': StringProperty(), + 'payload_bin': BinaryProperty(), + 'url': StringProperty(), + 'hashes': HashesProperty(), + } + + +class File(Observable): + _type = 'file' + _properties = { + 'type': TypeProperty(_type), + } diff --git a/stix2/properties.py b/stix2/properties.py index b8ce17b..4b4da7a 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -220,10 +220,14 @@ class ObservableProperty(Property): def clean(self, value): dictified = dict(value) - for obj in dictified: - if not issubclass(type(obj), Observable): + from .__init__ import parse # avoid circular import + for key, obj in dictified.items(): + parsed_obj = parse(obj, observable=True) + if not issubclass(type(parsed_obj), Observable): raise ValueError("Objects in an observable property must be " "Cyber Observable Objects") + dictified[key] = parsed_obj + return dictified diff --git a/stix2/sdo.py b/stix2/sdo.py index 693b750..ca1e5b4 100644 --- a/stix2/sdo.py +++ b/stix2/sdo.py @@ -3,9 +3,9 @@ from .base import _STIXBase from .common import COMMON_PROPERTIES from .other import KillChainPhase -from .properties import (IDProperty, IntegerProperty, ListProperty, Property, - ReferenceProperty, StringProperty, TimestampProperty, - TypeProperty) +from .properties import (IDProperty, IntegerProperty, ListProperty, + ObservableProperty, ReferenceProperty, + StringProperty, TimestampProperty, TypeProperty) from .utils import NOW @@ -126,7 +126,7 @@ class ObservedData(_STIXBase): 'first_observed': TimestampProperty(required=True), 'last_observed': TimestampProperty(required=True), 'number_observed': IntegerProperty(required=True), - 'objects': Property(), + 'objects': ObservableProperty(), }) diff --git a/stix2/test/test_observed_data.py b/stix2/test/test_observed_data.py index 52dc15b..154b395 100644 --- a/stix2/test/test_observed_data.py +++ b/stix2/test/test_observed_data.py @@ -1,4 +1,5 @@ import datetime as dt +import re import pytest import pytz @@ -70,6 +71,48 @@ def test_parse_observed_data(data): assert odata.first_observed == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc) assert odata.last_observed == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc) assert odata.created_by_ref == "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff" - # assert odata.objects["0"].type == "file" # TODO + assert odata.objects["0"].type == "file" + + +@pytest.mark.parametrize("data", [ + """"0": { + "type": "artifact", + "mime_type": "image/jpeg", + "payload_bin": "VBORw0KGgoAAAANSUhEUgAAADI==" + }""", + """"0": { + "type": "artifact", + "mime_type": "image/jpeg", + "url": "https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg", + "hashes": { + "MD5": "6826f9a05da08134006557758bb3afbb" + } + }""", +]) +def test_parse_artifact_valid(data): + odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED) + odata = stix2.parse(odata_str) + assert odata.objects["0"].type == "artifact" + + +@pytest.mark.parametrize("data", [ + """"0": { + "type": "artifact", + "mime_type": "image/jpeg", + "payload_bin": "abcVBORw0KGgoAAAANSUhEUgAAADI==" + }""", + """"0": { + "type": "artifact", + "mime_type": "image/jpeg", + "url": "https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg", + "hashes": { + "MD5": "a" + } + }""", +]) +def test_parse_artifact_invalid(data): + odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED) + with pytest.raises(ValueError): + stix2.parse(odata_str) # TODO: Add other examples