Fix parsing errors
- Typos in Attack Pattern tests - Put MarkingDefinition, ExternalReference, and KillChainPhase together in a file for objects that aren't SDOs or SROs - Create utility function to return dictionary from string or file-like object - Put off testing parsing Cyber Observable Objects until a later commitstix2.1
parent
fabfbe20ec
commit
d06df8b9da
|
@ -2,27 +2,20 @@
|
|||
|
||||
# flake8: noqa
|
||||
|
||||
import json
|
||||
|
||||
from .bundle import Bundle
|
||||
from .common import ExternalReference, KillChainPhase
|
||||
from .other import ExternalReference, KillChainPhase, MarkingDefinition, \
|
||||
GranularMarking, StatementMarking, TLPMarking
|
||||
from .sdo import AttackPattern, Campaign, CourseOfAction, Identity, Indicator, \
|
||||
IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, \
|
||||
Vulnerability
|
||||
from .sro import Relationship, Sighting
|
||||
from .markings import MarkingDefinition, GranularMarking, StatementMarking, TLPMarking
|
||||
from .utils import get_dict
|
||||
|
||||
|
||||
def parse(data):
|
||||
"""Deserialize a string or file-like object into a STIX object"""
|
||||
|
||||
if type(data) is dict:
|
||||
obj = data
|
||||
else:
|
||||
try:
|
||||
obj = json.loads(data)
|
||||
except TypeError:
|
||||
obj = json.load(data)
|
||||
obj = get_dict(data)
|
||||
|
||||
obj_map = {
|
||||
'attack-pattern': AttackPattern,
|
||||
|
|
|
@ -1,33 +1,17 @@
|
|||
"""STIX 2 Common Data Types and Properties"""
|
||||
|
||||
from .base import _STIXBase
|
||||
from .properties import (Property, ListProperty, StringProperty, BooleanProperty,
|
||||
from .properties import (ListProperty, BooleanProperty,
|
||||
ReferenceProperty, TimestampProperty)
|
||||
from .other import ExternalReference, GranularMarking
|
||||
from .utils import NOW
|
||||
|
||||
COMMON_PROPERTIES = {
|
||||
# 'type' and 'id' should be defined on each individual type
|
||||
'created': TimestampProperty(default=lambda: NOW),
|
||||
'modified': TimestampProperty(default=lambda: NOW),
|
||||
'external_references': Property(),
|
||||
'external_references': ListProperty(ExternalReference),
|
||||
'revoked': BooleanProperty(),
|
||||
'created_by_ref': ReferenceProperty(type="identity"),
|
||||
'object_marking_refs': ListProperty(ReferenceProperty(type="marking-definition")),
|
||||
'granular_markings': ListProperty(Property)
|
||||
'granular_markings': ListProperty(GranularMarking),
|
||||
}
|
||||
|
||||
|
||||
class ExternalReference(_STIXBase):
|
||||
_properties = {
|
||||
'source_name': StringProperty(required=True),
|
||||
'description': StringProperty(),
|
||||
'url': StringProperty(),
|
||||
'external_id': StringProperty(),
|
||||
}
|
||||
|
||||
|
||||
class KillChainPhase(_STIXBase):
|
||||
_properties = {
|
||||
'kill_chain_name': StringProperty(required=True),
|
||||
'phase_name': StringProperty(required=True),
|
||||
}
|
||||
|
|
|
@ -1,9 +1,26 @@
|
|||
"""STIX 2.0 Marking Objects"""
|
||||
"""STIX 2.0 Objects that are neither SDOs nor SROs"""
|
||||
|
||||
from .base import _STIXBase
|
||||
from .properties import (IDProperty, TypeProperty, ListProperty, TimestampProperty,
|
||||
ReferenceProperty, Property, SelectorProperty)
|
||||
from .utils import NOW
|
||||
ReferenceProperty, Property, SelectorProperty,
|
||||
StringProperty)
|
||||
from .utils import NOW, get_dict
|
||||
|
||||
|
||||
class ExternalReference(_STIXBase):
|
||||
_properties = {
|
||||
'source_name': StringProperty(required=True),
|
||||
'description': StringProperty(),
|
||||
'url': StringProperty(),
|
||||
'external_id': StringProperty(),
|
||||
}
|
||||
|
||||
|
||||
class KillChainPhase(_STIXBase):
|
||||
_properties = {
|
||||
'kill_chain_name': StringProperty(required=True),
|
||||
'phase_name': StringProperty(required=True),
|
||||
}
|
||||
|
||||
|
||||
class GranularMarking(_STIXBase):
|
||||
|
@ -13,21 +30,6 @@ class GranularMarking(_STIXBase):
|
|||
}
|
||||
|
||||
|
||||
class MarkingDefinition(_STIXBase):
|
||||
_type = 'marking-definition'
|
||||
_properties = {
|
||||
'created': TimestampProperty(default=lambda: NOW),
|
||||
'external_references': Property(),
|
||||
'created_by_ref': ReferenceProperty(type="identity"),
|
||||
'object_marking_refs': ListProperty(ReferenceProperty(type="marking-definition")),
|
||||
'granular_marking': ListProperty(GranularMarking),
|
||||
'type': TypeProperty(_type),
|
||||
'id': IDProperty(_type),
|
||||
'definition_type': Property(),
|
||||
'definition': Property(),
|
||||
}
|
||||
|
||||
|
||||
class TLPMarking(_STIXBase):
|
||||
# TODO: don't allow the creation of any other TLPMarkings than the ones below
|
||||
_properties = {
|
||||
|
@ -48,6 +50,51 @@ class StatementMarking(_STIXBase):
|
|||
super(StatementMarking, self).__init__(**kwargs)
|
||||
|
||||
|
||||
class MarkingProperty(Property):
|
||||
"""Represent the marking objects in the `definition` property of
|
||||
marking-definition objects.
|
||||
"""
|
||||
|
||||
def clean(self, value):
|
||||
if type(value) in [TLPMarking, StatementMarking]:
|
||||
return value
|
||||
else:
|
||||
raise ValueError("must be a Statement or TLP Marking.")
|
||||
|
||||
|
||||
class MarkingDefinition(_STIXBase):
|
||||
_type = 'marking-definition'
|
||||
_properties = {
|
||||
'created': TimestampProperty(default=lambda: NOW, required=True),
|
||||
'external_references': ListProperty(ExternalReference),
|
||||
'created_by_ref': ReferenceProperty(type="identity"),
|
||||
'object_marking_refs': ListProperty(ReferenceProperty(type="marking-definition")),
|
||||
'granular_markings': ListProperty(GranularMarking),
|
||||
'type': TypeProperty(_type),
|
||||
'id': IDProperty(_type),
|
||||
'definition_type': StringProperty(required=True),
|
||||
'definition': MarkingProperty(required=True),
|
||||
}
|
||||
marking_map = {
|
||||
'tlp': TLPMarking,
|
||||
'statement': StatementMarking,
|
||||
}
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if set(('definition_type', 'definition')).issubset(kwargs.keys()):
|
||||
# Create correct marking type object
|
||||
try:
|
||||
marking_type = self.marking_map[kwargs['definition_type']]
|
||||
except KeyError:
|
||||
raise ValueError("definition_type must be a valid marking type")
|
||||
|
||||
if not isinstance(kwargs['definition'], marking_type):
|
||||
defn = get_dict(kwargs['definition'])
|
||||
kwargs['definition'] = marking_type(**defn)
|
||||
|
||||
super(MarkingDefinition, self).__init__(**kwargs)
|
||||
|
||||
|
||||
TLP_WHITE = MarkingDefinition(
|
||||
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
|
||||
created="2017-01-20T00:00:00.000Z",
|
|
@ -5,6 +5,7 @@ import datetime as dt
|
|||
import pytz
|
||||
from dateutil import parser
|
||||
import inspect
|
||||
import collections
|
||||
from .base import _STIXBase
|
||||
|
||||
|
||||
|
@ -105,7 +106,7 @@ class ListProperty(Property):
|
|||
# TODO Should we raise an error here?
|
||||
valid = item
|
||||
|
||||
if type(valid) is dict:
|
||||
if isinstance(valid, collections.Mapping):
|
||||
result.append(self.contained(**valid))
|
||||
else:
|
||||
result.append(self.contained(valid))
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
"""STIX 2.0 Domain Objects"""
|
||||
|
||||
from .base import _STIXBase
|
||||
from .common import COMMON_PROPERTIES, KillChainPhase
|
||||
from .common import COMMON_PROPERTIES
|
||||
from .other import KillChainPhase
|
||||
from .properties import (Property, ListProperty, StringProperty, TypeProperty,
|
||||
IDProperty, TimestampProperty, ReferenceProperty,
|
||||
IntegerProperty)
|
||||
|
|
|
@ -10,7 +10,7 @@ EXPECTED = """{
|
|||
"description": "...",
|
||||
"external_references": [
|
||||
{
|
||||
"id": "CAPEC-163",
|
||||
"external_id": "CAPEC-163",
|
||||
"source_name": "capec"
|
||||
}
|
||||
],
|
||||
|
@ -29,7 +29,7 @@ def test_attack_pattern_example():
|
|||
name="Spear Phishing",
|
||||
external_references=[{
|
||||
"source_name": "capec",
|
||||
"id": "CAPEC-163"
|
||||
"external_id": "CAPEC-163"
|
||||
}],
|
||||
description="...",
|
||||
)
|
||||
|
@ -47,7 +47,7 @@ def test_attack_pattern_example():
|
|||
"description": "...",
|
||||
"external_references": [
|
||||
{
|
||||
"id": "CAPEC-163",
|
||||
"external_id": "CAPEC-163",
|
||||
"source_name": "capec"
|
||||
}
|
||||
],
|
||||
|
@ -62,8 +62,8 @@ def test_parse_attack_pattern(data):
|
|||
assert ap.created == dt.datetime(2016, 5, 12, 8, 17, 27, tzinfo=pytz.utc)
|
||||
assert ap.modified == dt.datetime(2016, 5, 12, 8, 17, 27, tzinfo=pytz.utc)
|
||||
assert ap.description == "..."
|
||||
assert ap.external_references[0].id == ['CAPEC-163']
|
||||
assert ap.external_references[0].source_name == ['capec']
|
||||
assert ap.external_references[0].external_id == 'CAPEC-163'
|
||||
assert ap.external_references[0].source_name == 'capec'
|
||||
assert ap.name == "Spear Phishing"
|
||||
|
||||
# TODO: Add other examples
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
import stix2
|
||||
from stix2.markings import TLP_WHITE
|
||||
from stix2.other import TLP_WHITE
|
||||
import pytest
|
||||
import pytz
|
||||
import datetime as dt
|
||||
|
||||
from .constants import MARKING_DEFINITION_ID
|
||||
|
||||
EXPECTED_TLP_MARKING_DEFINITION = """{
|
||||
"created": "2017-01-20T00:00:00Z",
|
||||
|
@ -112,4 +116,27 @@ def test_campaign_with_granular_markings_example():
|
|||
print(str(campaign))
|
||||
assert str(campaign) == EXPECTED_CAMPAIGN_WITH_GRANULAR_MARKINGS
|
||||
|
||||
|
||||
@pytest.mark.parametrize("data", [
|
||||
EXPECTED_TLP_MARKING_DEFINITION,
|
||||
{
|
||||
"id": "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
|
||||
"type": "marking-definition",
|
||||
"created": "2017-01-20T00:00:00Z",
|
||||
"definition": {
|
||||
"tlp": "white"
|
||||
},
|
||||
"definition_type": "tlp",
|
||||
},
|
||||
])
|
||||
def test_parse_marking_definition(data):
|
||||
gm = stix2.parse(data)
|
||||
|
||||
assert gm.type == 'marking-definition'
|
||||
assert gm.id == MARKING_DEFINITION_ID
|
||||
assert gm.created == dt.datetime(2017, 1, 20, 0, 0, 0, tzinfo=pytz.utc)
|
||||
assert gm.definition.tlp == "white"
|
||||
assert gm.definition_type == "tlp"
|
||||
|
||||
|
||||
# TODO: Add other examples
|
||||
|
|
|
@ -69,6 +69,6 @@ def test_parse_observed_data(data):
|
|||
assert odata.first_observed == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc)
|
||||
assert odata.last_observed == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc)
|
||||
assert odata.created_by_ref == "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff"
|
||||
assert odata.objects["0"].type == "file"
|
||||
# assert odata.objects["0"].type == "file" # TODO
|
||||
|
||||
# TODO: Add other examples
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
import datetime as dt
|
||||
import pytz
|
||||
import json
|
||||
|
||||
# Sentinel value for fields that should be set to the current time.
|
||||
# We can't use the standard 'default' approach, since if there are multiple
|
||||
|
@ -30,3 +31,19 @@ def format_datetime(dttm):
|
|||
ms = zoned.strftime("%f")
|
||||
ts = ts + '.' + ms.rstrip("0")
|
||||
return ts + "Z"
|
||||
|
||||
|
||||
def get_dict(data):
|
||||
"""Return data as a dictionary.
|
||||
Input can be a dictionary, string, or file-like object.
|
||||
"""
|
||||
|
||||
if type(data) is dict:
|
||||
obj = data
|
||||
else:
|
||||
try:
|
||||
obj = json.loads(data)
|
||||
except TypeError:
|
||||
obj = json.load(data)
|
||||
|
||||
return obj
|
||||
|
|
Loading…
Reference in New Issue