Merge pull request #14 from oasis-open/parse-cyber-observables

Parse cyber observables
stix2.1
Greg Back 2017-05-19 09:15:00 -05:00 committed by GitHub
commit a913d9d5ad
33 changed files with 2305 additions and 97 deletions

7
.isort.cfg Normal file
View File

@ -0,0 +1,7 @@
[settings]
check=1
diff=1
known_third_party=dateutil,pytest,pytz,six
known_first_party=stix2
not_skip=__init__.py
force_sort_within_sections=1

View File

@ -6,3 +6,8 @@
args:
- --max-line-length=160
- id: check-merge-conflict
- repo: https://github.com/FalconSocial/pre-commit-python-sorter
sha: 1.0.4
hooks:
- id: python-import-sorter

View File

@ -2,16 +2,26 @@
# flake8: noqa
from . import exceptions
from .bundle import Bundle
from .other import ExternalReference, KillChainPhase, MarkingDefinition, \
GranularMarking, StatementMarking, TLPMarking
from .sdo import AttackPattern, Campaign, CourseOfAction, Identity, Indicator, \
IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, \
Vulnerability
from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
AutonomousSystem, Directory, DomainName,
EmailAddress, EmailMessage, EmailMIMEComponent, File,
HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address,
MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt,
Process, RasterImageExt, SocketExt, Software, TCPExt,
UNIXAccountExt, UserAccount, WindowsPEBinaryExt,
WindowsPEOptionalHeaderType, WindowsPESection,
WindowsProcessExt, WindowsRegistryKey,
WindowsRegistryValueType, WindowsServiceExt,
X509Certificate, X509V3ExtenstionsType)
from .other import (ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking)
from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator,
IntrusionSet, Malware, ObservedData, Report, ThreatActor,
Tool, Vulnerability)
from .sro import Relationship, Sighting
from .utils import get_dict
from . import exceptions
OBJ_MAP = {
'attack-pattern': AttackPattern,
@ -31,6 +41,59 @@ OBJ_MAP = {
'vulnerability': Vulnerability,
}
OBJ_MAP_OBSERVABLE = {
'artifact': Artifact,
'autonomous-system': AutonomousSystem,
'directory': Directory,
'domain-name': DomainName,
'email-address': EmailAddress,
'email-message': EmailMessage,
'file': File,
'ipv4-addr': IPv4Address,
'ipv6-addr': IPv6Address,
'mac-addr': MACAddress,
'mutex': Mutex,
'network-traffic': NetworkTraffic,
'process': Process,
'software': Software,
'url': URL,
'user-account': UserAccount,
'windows-registry-key': WindowsRegistryKey,
'x509-certificate': X509Certificate,
}
EXT_MAP_FILE = {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
}
EXT_MAP_NETWORK_TRAFFIC = {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
}
EXT_MAP_PROCESS = {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
}
EXT_MAP_USER_ACCOUNT = {
'unix-account-ext': UNIXAccountExt,
}
EXT_MAP = {
'file': EXT_MAP_FILE,
'network-traffic': EXT_MAP_NETWORK_TRAFFIC,
'process': EXT_MAP_PROCESS,
'user-account': EXT_MAP_USER_ACCOUNT,
}
def parse(data):
"""Deserialize a string or file-like object into a STIX object"""
@ -43,9 +106,35 @@ def parse(data):
else:
try:
obj_class = OBJ_MAP[obj['type']]
return obj_class(**obj)
except KeyError:
# TODO handle custom objects
raise ValueError("Can't parse unknown object type!")
raise ValueError("Can't parse unknown object type '%s'!" % obj['type'])
return obj_class(**obj)
return obj
def parse_observable(data, _valid_refs):
"""Deserialize a string or file-like object into a STIX Cyber Observable
object.
"""
obj = get_dict(data)
obj['_valid_refs'] = _valid_refs
if 'type' not in obj:
raise ValueError("'type' is a required property!")
try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
# TODO handle custom observable objects
raise ValueError("Can't parse unknown object type '%s'!" % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
if name not in EXT_MAP[obj['type']]:
raise ValueError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
ext_class = EXT_MAP[obj['type']][name]
obj['extensions'][name] = ext_class(**obj['extensions'][name])
return obj_class(**obj)

View File

@ -3,17 +3,16 @@
import collections
import copy
import datetime as dt
import json
from .exceptions import ExtraFieldsError, ImmutableError, InvalidValueError, \
MissingFieldsError, RevokeError, UnmodifiablePropertyError
from .utils import format_datetime, get_timestamp, NOW, parse_into_datetime
from .exceptions import (AtLeastOnePropertyError, DependentPropertiestError, ExtraFieldsError, ImmutableError,
InvalidObjRefError, InvalidValueError, MissingFieldsError, MutuallyExclusivePropertiesError,
RevokeError, UnmodifiablePropertyError)
from .utils import NOW, format_datetime, get_timestamp, parse_into_datetime
__all__ = ['STIXJSONEncoder', '_STIXBase']
DEFAULT_ERROR = "{type} must have {field}='{expected}'."
DEFAULT_ERROR = "{type} must have {property}='{expected}'."
class STIXJSONEncoder(json.JSONEncoder):
@ -48,6 +47,42 @@ class _STIXBase(collections.Mapping):
except ValueError as exc:
raise InvalidValueError(self.__class__, prop_name, reason=str(exc))
# interproperty constraint methods
def _check_mutually_exclusive_properties(self, list_of_properties, at_least_one=True):
current_properties = self.properties_populated()
count = len(set(list_of_properties).intersection(current_properties))
# at_least_one allows for xor to be checked
if count > 1 or (at_least_one and count == 0):
raise MutuallyExclusivePropertiesError(self.__class__, list_of_properties)
def _check_at_least_one_property(self, list_of_properties=None):
if not list_of_properties:
list_of_properties = sorted(list(self.__class__._properties.keys()))
if "type" in list_of_properties:
list_of_properties.remove("type")
current_properties = self.properties_populated()
list_of_properties_populated = set(list_of_properties).intersection(current_properties)
if list_of_properties and (not list_of_properties_populated or list_of_properties_populated == set(["extensions"])):
raise AtLeastOnePropertyError(self.__class__, list_of_properties)
def _check_properties_dependency(self, list_of_properties, list_of_dependent_properties, values=[]):
failed_dependency_pairs = []
current_properties = self.properties_populated()
for p in list_of_properties:
v = values.pop() if values else None
for dp in list_of_dependent_properties:
if dp in current_properties and (p not in current_properties or (v and not current_properties(p) == v)):
failed_dependency_pairs.append((p, dp))
if failed_dependency_pairs:
raise DependentPropertiestError(self.__class__, failed_dependency_pairs)
def _check_object_constraints(self):
if self.granular_markings:
for m in self.granular_markings:
# TODO: check selectors
pass
def __init__(self, **kwargs):
cls = self.__class__
@ -65,9 +100,9 @@ class _STIXBase(collections.Mapping):
if prop_value:
setting_kwargs[prop_name] = prop_value
# Detect any missing required fields
required_fields = get_required_properties(cls._properties)
missing_kwargs = set(required_fields) - set(setting_kwargs)
# Detect any missing required properties
required_properties = get_required_properties(cls._properties)
missing_kwargs = set(required_properties) - set(setting_kwargs)
if missing_kwargs:
raise MissingFieldsError(cls, missing_kwargs)
@ -76,10 +111,7 @@ class _STIXBase(collections.Mapping):
self._inner = setting_kwargs
if self.granular_markings:
for m in self.granular_markings:
# TODO: check selectors
pass
self._check_object_constraints()
def __getitem__(self, key):
return self._inner[key]
@ -115,6 +147,9 @@ class _STIXBase(collections.Mapping):
cls = type(self)
return cls(**new_inner)
def properties_populated(self):
return list(self._inner.keys())
# Versioning API
def new_version(self, **kwargs):
@ -142,3 +177,55 @@ class _STIXBase(collections.Mapping):
if self.revoked:
raise RevokeError("revoke")
return self.new_version(revoked=True)
class _Observable(_STIXBase):
def __init__(self, **kwargs):
# the constructor might be called independently of an observed data object
if '_valid_refs' in kwargs:
self._STIXBase__valid_refs = kwargs.pop('_valid_refs')
else:
self._STIXBase__valid_refs = []
super(_Observable, self).__init__(**kwargs)
def _check_ref(self, ref, prop, prop_name):
if ref not in self._STIXBase__valid_refs:
raise InvalidObjRefError(self.__class__, prop_name, "'%s' is not a valid object in local scope" % ref)
try:
allowed_types = prop.contained.valid_types
except AttributeError:
try:
allowed_types = prop.valid_types
except AttributeError:
raise ValueError("'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty or a ListProperty "
"containing ObjectReferenceProperty." % prop_name)
if allowed_types:
try:
ref_type = self._STIXBase__valid_refs[ref]
except TypeError:
raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__)
if ref_type not in allowed_types:
raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type))
def _check_property(self, prop_name, prop, kwargs):
super(_Observable, self)._check_property(prop_name, prop, kwargs)
if prop_name not in kwargs:
return
if prop_name.endswith('_ref'):
ref = kwargs[prop_name]
self._check_ref(ref, prop, prop_name)
elif prop_name.endswith('_refs'):
for ref in kwargs[prop_name]:
self._check_ref(ref, prop, prop_name)
class _Extension(_STIXBase):
def _check_object_constraints(self):
super(_Extension, self)._check_object_constraints()
self._check_at_least_one_property()

View File

@ -17,31 +17,31 @@ class InvalidValueError(STIXError, ValueError):
class MissingFieldsError(STIXError, ValueError):
"""Missing required field(s) when constructing STIX object."""
"""Missing one or more required properties when constructing STIX object."""
def __init__(self, cls, fields):
def __init__(self, cls, properties):
super(MissingFieldsError, self).__init__()
self.cls = cls
self.fields = sorted(list(fields))
self.properties = sorted(list(properties))
def __str__(self):
msg = "No values for required field(s) for {0}: ({1})."
msg = "No values for required properties for {0}: ({1})."
return msg.format(self.cls.__name__,
", ".join(x for x in self.fields))
", ".join(x for x in self.properties))
class ExtraFieldsError(STIXError, TypeError):
"""Extra field(s) were provided when constructing STIX object."""
"""One or more extra properties were provided when constructing STIX object."""
def __init__(self, cls, fields):
def __init__(self, cls, properties):
super(ExtraFieldsError, self).__init__()
self.cls = cls
self.fields = sorted(list(fields))
self.properties = sorted(list(properties))
def __str__(self):
msg = "Unexpected field(s) for {0}: ({1})."
msg = "Unexpected properties for {0}: ({1})."
return msg.format(self.cls.__name__,
", ".join(x for x in self.fields))
", ".join(x for x in self.properties))
class ImmutableError(STIXError, ValueError):
@ -51,6 +51,33 @@ class ImmutableError(STIXError, ValueError):
super(ImmutableError, self).__init__("Cannot modify properties after creation.")
class DictionaryKeyError(STIXError, ValueError):
"""Dictionary key does not conform to the correct format."""
def __init__(self, key, reason):
super(DictionaryKeyError, self).__init__()
self.key = key
self.reason = reason
def __str__(self):
msg = "Invalid dictionary key {0.key}: ({0.reason})."
return msg.format(self)
class InvalidObjRefError(STIXError, ValueError):
"""A STIX Cyber Observable Object contains an invalid object reference."""
def __init__(self, cls, prop_name, reason):
super(InvalidObjRefError, self).__init__()
self.cls = cls
self.prop_name = prop_name
self.reason = reason
def __str__(self):
msg = "Invalid object reference for '{0.cls.__name__}:{0.prop_name}': {0.reason}"
return msg.format(self)
class UnmodifiablePropertyError(STIXError, ValueError):
"""Attempted to modify an unmodifiable property of object when creating a new version"""
@ -63,6 +90,48 @@ class UnmodifiablePropertyError(STIXError, ValueError):
return msg.format(", ".join(self.unchangable_properties))
class MutuallyExclusivePropertiesError(STIXError, TypeError):
"""Violating interproperty mutually exclusive constraint of a STIX object type."""
def __init__(self, cls, properties):
super(MutuallyExclusivePropertiesError, self).__init__()
self.cls = cls
self.properties = sorted(list(properties))
def __str__(self):
msg = "The field(s) for {0}: ({1}) are mutually exclusive."
return msg.format(self.cls.__name__,
", ".join(x for x in self.properties))
class DependentPropertiestError(STIXError, TypeError):
"""Violating interproperty dependency constraint of a STIX object type."""
def __init__(self, cls, dependencies):
super(DependentPropertiestError, self).__init__()
self.cls = cls
self.dependencies = dependencies
def __str__(self):
msg = "The property dependencies for {0}: ({1}) are not met."
return msg.format(self.cls.__name__,
", ".join(x for x in self.dependencies))
class AtLeastOnePropertyError(STIXError, TypeError):
"""Violating a constraint of a STIX object type that at least one of the given properties must be populated."""
def __init__(self, cls, properties):
super(AtLeastOnePropertyError, self).__init__()
self.cls = cls
self.properties = sorted(list(properties))
def __str__(self):
msg = "At least one of the field(s) for {0}: ({1}) must be populated."
return msg.format(self.cls.__name__,
", ".join(x for x in self.properties))
class RevokeError(STIXError, ValueError):
"""Attempted to an operation on a revoked object"""

586
stix2/observables.py Normal file
View File

@ -0,0 +1,586 @@
"""STIX 2.0 Cyber Observable Objects
Embedded observable object types, such as Email MIME Component, which is
embedded in Email Message objects, inherit from _STIXBase instead of Observable
and do not have a '_type' attribute.
"""
from .base import _Extension, _Observable, _STIXBase
from .exceptions import AtLeastOnePropertyError
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty,
ExtensionsProperty, FloatProperty, HashesProperty,
HexProperty, IntegerProperty, ListProperty,
ObjectReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
class Artifact(_Observable):
_type = 'artifact'
_properties = {
'type': TypeProperty(_type),
'mime_type': StringProperty(),
'payload_bin': BinaryProperty(),
'url': StringProperty(),
'hashes': HashesProperty(),
}
def _check_object_constraints(self):
super(Artifact, self)._check_object_constraints()
self._check_mutually_exclusive_properties(["payload_bin", "url"])
self._check_properties_dependency(["hashes"], ["url"])
class AutonomousSystem(_Observable):
_type = 'autonomous-system'
_properties = {
'type': TypeProperty(_type),
'number': IntegerProperty(),
'name': StringProperty(),
'rir': StringProperty(),
}
class Directory(_Observable):
_type = 'directory'
_properties = {
'type': TypeProperty(_type),
'path': StringProperty(required=True),
'path_enc': StringProperty(),
# these are not the created/modified timestamps of the object itself
'created': TimestampProperty(),
'modified': TimestampProperty(),
'accessed': TimestampProperty(),
'contains_refs': ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory'])),
}
class DomainName(_Observable):
_type = 'domain-name'
_properties = {
'type': TypeProperty(_type),
'value': StringProperty(required=True),
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name'])),
}
class EmailAddress(_Observable):
_type = 'email-address'
_properties = {
'type': TypeProperty(_type),
'value': StringProperty(required=True),
'display_name': StringProperty(),
'belongs_to_ref': ObjectReferenceProperty(valid_types='user-account'),
}
class EmailMIMEComponent(_STIXBase):
_properties = {
'body': StringProperty(),
'body_raw_ref': ObjectReferenceProperty(valid_types=['artifact', 'file']),
'content_type': StringProperty(),
'content_disposition': StringProperty(),
}
def _check_object_constraints(self):
super(EmailMIMEComponent, self)._check_object_constraints()
self._check_at_least_one_property(["body", "body_raw_ref"])
class EmailMessage(_Observable):
_type = 'email-message'
_properties = {
'type': TypeProperty(_type),
'is_multipart': BooleanProperty(required=True),
'date': TimestampProperty(),
'content_type': StringProperty(),
'from_ref': ObjectReferenceProperty(valid_types='email-addr'),
'sender_ref': ObjectReferenceProperty(valid_types='email-addr'),
'to_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')),
'cc_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')),
'bcc_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')),
'subject': StringProperty(),
'received_lines': ListProperty(StringProperty),
'additional_header_fields': DictionaryProperty(),
'body': StringProperty(),
'body_multipart': ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent)),
'raw_email_ref': ObjectReferenceProperty(valid_types='artifact'),
}
def _check_object_constraints(self):
super(EmailMessage, self)._check_object_constraints()
self._check_properties_dependency(["is_multipart"], ["body_multipart"])
# self._dependency(["is_multipart"], ["body"], [False])
class ArchiveExt(_Extension):
_properties = {
'contains_refs': ListProperty(ObjectReferenceProperty(valid_types='file'), required=True),
'version': StringProperty(),
'comment': StringProperty(),
}
class AlternateDataStream(_STIXBase):
_properties = {
'name': StringProperty(required=True),
'hashes': HashesProperty(),
'size': IntegerProperty(),
}
class NTFSExt(_Extension):
_properties = {
'sid': StringProperty(),
'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)),
}
class PDFExt(_Extension):
_properties = {
'version': StringProperty(),
'is_optimized': BooleanProperty(),
'document_info_dict': DictionaryProperty(),
'pdfid0': StringProperty(),
'pdfid1': StringProperty(),
}
class RasterImageExt(_Extension):
_properties = {
'image_height': IntegerProperty(),
'image_weight': IntegerProperty(),
'bits_per_pixel': IntegerProperty(),
'image_compression_algorithm': StringProperty(),
'exif_tags': DictionaryProperty(),
}
class WindowsPEOptionalHeaderType(_STIXBase):
_properties = {
'magic_hex': HexProperty(),
'major_linker_version': IntegerProperty(),
'minor_linker_version': IntegerProperty(),
'size_of_code': IntegerProperty(),
'size_of_initialized_data': IntegerProperty(),
'size_of_uninitialized_data': IntegerProperty(),
'address_of_entry_point': IntegerProperty(),
'base_of_code': IntegerProperty(),
'base_of_data': IntegerProperty(),
'image_base': IntegerProperty(),
'section_alignment': IntegerProperty(),
'file_alignment': IntegerProperty(),
'major_os_version': IntegerProperty(),
'minor_os_version': IntegerProperty(),
'major_image_version': IntegerProperty(),
'minor_image_version': IntegerProperty(),
'major_subsystem_version': IntegerProperty(),
'minor_subsystem_version': IntegerProperty(),
'win32_version_value_hex': HexProperty(),
'size_of_image': IntegerProperty(),
'size_of_headers': IntegerProperty(),
'checksum_hex': HexProperty(),
'subsystem_hex': HexProperty(),
'dll_characteristics_hex': HexProperty(),
'size_of_stack_reserve': IntegerProperty(),
'size_of_stack_commit': IntegerProperty(),
'size_of_heap_reserve': IntegerProperty(),
'size_of_heap_commit': IntegerProperty(),
'loader_flags_hex': HexProperty(),
'number_of_rva_and_sizes': IntegerProperty(),
'hashes': HashesProperty(),
}
def _check_object_constraints(self):
super(WindowsPEOptionalHeaderType, self)._check_object_constraints()
self._check_at_least_one_property()
class WindowsPESection(_STIXBase):
_properties = {
'name': StringProperty(required=True),
'size': IntegerProperty(),
'entropy': FloatProperty(),
'hashes': HashesProperty(),
}
class WindowsPEBinaryExt(_Extension):
_properties = {
'pe_type': StringProperty(required=True), # open_vocab
'imphash': StringProperty(),
'machine_hex': HexProperty(),
'number_of_sections': IntegerProperty(),
'time_date_stamp': TimestampProperty(),
'pointer_to_symbol_table_hex': HexProperty(),
'number_of_symbols': IntegerProperty(),
'size_of_optional_header': IntegerProperty(),
'characteristics_hex': HexProperty(),
'file_header_hashes': HashesProperty(),
'optional_header': EmbeddedObjectProperty(type=WindowsPEOptionalHeaderType),
'sections': ListProperty(EmbeddedObjectProperty(type=WindowsPESection)),
}
class File(_Observable):
_type = 'file'
_properties = {
'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'hashes': HashesProperty(),
'size': IntegerProperty(),
'name': StringProperty(),
'name_enc': StringProperty(),
'magic_number_hex': HexProperty(),
'mime_type': StringProperty(),
# these are not the created/modified timestamps of the object itself
'created': TimestampProperty(),
'modified': TimestampProperty(),
'accessed': TimestampProperty(),
'parent_directory_ref': ObjectReferenceProperty(valid_types='directory'),
'is_encrypted': BooleanProperty(),
'encryption_algorithm': StringProperty(),
'decryption_key': StringProperty(),
'contains_refs': ListProperty(ObjectReferenceProperty),
'content_ref': ObjectReferenceProperty(valid_types='artifact'),
}
def _check_object_constraints(self):
super(File, self)._check_object_constraints()
self._check_properties_dependency(["is_encrypted"], ["encryption_algorithm", "decryption_key"])
self._check_at_least_one_property(["hashes", "name"])
class IPv4Address(_Observable):
_type = 'ipv4-addr'
_properties = {
'type': TypeProperty(_type),
'value': StringProperty(required=True),
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')),
'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')),
}
class IPv6Address(_Observable):
_type = 'ipv6-addr'
_properties = {
'type': TypeProperty(_type),
'value': StringProperty(required=True),
'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')),
'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')),
}
class MACAddress(_Observable):
_type = 'mac-addr'
_properties = {
'type': TypeProperty(_type),
'value': StringProperty(required=True),
}
class Mutex(_Observable):
_type = 'mutex'
_properties = {
'type': TypeProperty(_type),
'name': StringProperty(),
}
class HTTPRequestExt(_Extension):
_properties = {
'request_method': StringProperty(required=True),
'request_value': StringProperty(required=True),
'request_version': StringProperty(),
'request_header': DictionaryProperty(),
'message_body_length': IntegerProperty(),
'message_body_data_ref': ObjectReferenceProperty(valid_types='artifact'),
}
class ICMPExt(_Extension):
_properties = {
'icmp_type_hex': HexProperty(required=True),
'icmp_code_hex': HexProperty(required=True),
}
class SocketExt(_Extension):
_properties = {
'address_family': EnumProperty([
"AF_UNSPEC",
"AF_INET",
"AF_IPX",
"AF_APPLETALK",
"AF_NETBIOS",
"AF_INET6",
"AF_IRDA",
"AF_BTH",
], required=True),
'is_blocking': BooleanProperty(),
'is_listening': BooleanProperty(),
'protocol_family': EnumProperty([
"PF_INET",
"PF_IPX",
"PF_APPLETALK",
"PF_INET6",
"PF_AX25",
"PF_NETROM"
]),
'options': DictionaryProperty(),
'socket_type': EnumProperty([
"SOCK_STREAM",
"SOCK_DGRAM",
"SOCK_RAW",
"SOCK_RDM",
"SOCK_SEQPACKET",
]),
}
class TCPExt(_Extension):
_properties = {
'src_flags_hex': HexProperty(),
'dst_flags_hex': HexProperty(),
}
class NetworkTraffic(_Observable):
_type = 'network-traffic'
_properties = {
'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'start': TimestampProperty(),
'end': TimestampProperty(),
'is_active': BooleanProperty(),
'src_ref': ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name']),
'dst_ref': ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name']),
'src_port': IntegerProperty(),
'dst_port': IntegerProperty(),
'protocols': ListProperty(StringProperty, required=True),
'src_byte_count': IntegerProperty(),
'dst_byte_count': IntegerProperty(),
'src_packets': IntegerProperty(),
'dst_packets': IntegerProperty(),
'ipfix': DictionaryProperty(),
'src_payload_ref': ObjectReferenceProperty(valid_types='artifact'),
'dst_payload_ref': ObjectReferenceProperty(valid_types='artifact'),
'encapsulates_refs': ListProperty(ObjectReferenceProperty(valid_types='network-traffic')),
'encapsulates_by_ref': ObjectReferenceProperty(valid_types='network-traffic'),
}
def _check_object_constraints(self):
super(NetworkTraffic, self)._check_object_constraints()
self._check_at_least_one_property(["src_ref", "dst_ref"])
class WindowsProcessExt(_Extension):
_properties = {
'aslr_enabled': BooleanProperty(),
'dep_enabled': BooleanProperty(),
'priority': StringProperty(),
'owner_sid': StringProperty(),
'window_title': StringProperty(),
'startup_info': DictionaryProperty(),
}
class WindowsServiceExt(_Extension):
_properties = {
'service_name': StringProperty(required=True),
'descriptions': ListProperty(StringProperty),
'display_name': StringProperty(),
'group_name': StringProperty(),
'start_type': EnumProperty([
"SERVICE_AUTO_START",
"SERVICE_BOOT_START",
"SERVICE_DEMAND_START",
"SERVICE_DISABLED",
"SERVICE_SYSTEM_ALERT",
]),
'service_dll_refs': ListProperty(ObjectReferenceProperty(valid_types='file')),
'service_type': EnumProperty([
"SERVICE_KERNEL_DRIVER",
"SERVICE_FILE_SYSTEM_DRIVER",
"SERVICE_WIN32_OWN_PROCESS",
"SERVICE_WIN32_SHARE_PROCESS",
]),
'service_status': EnumProperty([
"SERVICE_CONTINUE_PENDING",
"SERVICE_PAUSE_PENDING",
"SERVICE_PAUSED",
"SERVICE_RUNNING",
"SERVICE_START_PENDING",
"SERVICE_STOP_PENDING",
"SERVICE_STOPPED",
]),
}
class Process(_Observable):
_type = 'process'
_properties = {
'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'is_hidden': BooleanProperty(),
'pid': IntegerProperty(),
'name': StringProperty(),
# this is not the created timestamps of the object itself
'created': TimestampProperty(),
'cwd': StringProperty(),
'arguments': ListProperty(StringProperty),
'command_line': StringProperty(),
'environment_variables': DictionaryProperty(),
'opened_connection_refs': ListProperty(ObjectReferenceProperty(valid_types='network-traffic')),
'creator_user_ref': ObjectReferenceProperty(valid_types='user-account'),
'binary_ref': ObjectReferenceProperty(valid_types='file'),
'parent_ref': ObjectReferenceProperty(valid_types='process'),
'child_refs': ListProperty(ObjectReferenceProperty('process')),
}
def _check_object_constraints(self):
# no need to check windows-service-ext, since it has a required property
super(Process, self)._check_object_constraints()
try:
self._check_at_least_one_property()
if self.extensions and "windows-process-ext" in self.extensions:
self.extensions["windows-process-ext"]._check_at_least_one_property()
except AtLeastOnePropertyError as enclosing_exc:
if not self.extensions:
raise enclosing_exc
else:
if "windows-process-ext" in self.extensions:
self.extensions["windows-process-ext"]._check_at_least_one_property()
class Software(_Observable):
_type = 'software'
_properties = {
'type': TypeProperty(_type),
'name': StringProperty(required=True),
'cpe': StringProperty(),
'languages': ListProperty(StringProperty),
'vendor': StringProperty(),
'version': StringProperty(),
}
class URL(_Observable):
_type = 'url'
_properties = {
'type': TypeProperty(_type),
'value': StringProperty(required=True),
}
class UNIXAccountExt(_Extension):
_properties = {
'gid': IntegerProperty(),
'groups': ListProperty(StringProperty),
'home_dir': StringProperty(),
'shell': StringProperty(),
}
class UserAccount(_Observable):
_type = 'user-account'
_properties = {
'type': TypeProperty(_type),
'extensions': ExtensionsProperty(enclosing_type=_type),
'user_id': StringProperty(required=True),
'account_login': StringProperty(),
'account_type': StringProperty(), # open vocab
'display_name': StringProperty(),
'is_service_account': BooleanProperty(),
'is_privileged': BooleanProperty(),
'can_escalate_privs': BooleanProperty(),
'is_disabled': BooleanProperty(),
'account_created': TimestampProperty(),
'account_expires': TimestampProperty(),
'password_last_changed': TimestampProperty(),
'account_first_login': TimestampProperty(),
'account_last_login': TimestampProperty(),
}
class WindowsRegistryValueType(_STIXBase):
_type = 'windows-registry-value-type'
_properties = {
'name': StringProperty(required=True),
'data': StringProperty(),
'data_type': EnumProperty([
'REG_NONE',
'REG_SZ',
'REG_EXPAND_SZ',
'REG_BINARY',
'REG_DWORD',
'REG_DWORD_BIG_ENDIAN',
'REG_LINK',
'REG_MULTI_SZ',
'REG_RESOURCE_LIST',
'REG_FULL_RESOURCE_DESCRIPTION',
'REG_RESOURCE_REQUIREMENTS_LIST',
'REG_QWORD',
'REG_INVALID_TYPE',
]),
}
class WindowsRegistryKey(_Observable):
_type = 'windows-registry-key'
_properties = {
'type': TypeProperty(_type),
'key': StringProperty(required=True),
'values': ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType)),
# this is not the modified timestamps of the object itself
'modified': TimestampProperty(),
'creator_user_ref': ObjectReferenceProperty(valid_types='user-account'),
'number_of_subkeys': IntegerProperty(),
}
@property
def values(self):
# Needed because 'values' is a property on collections.Mapping objects
return self._inner['values']
class X509V3ExtenstionsType(_STIXBase):
_type = 'x509-v3-extensions-type'
_properties = {
'basic_constraints': StringProperty(),
'name_constraints': StringProperty(),
'policy_constraints': StringProperty(),
'key_usage': StringProperty(),
'extended_key_usage': StringProperty(),
'subject_key_identifier': StringProperty(),
'authority_key_identifier': StringProperty(),
'subject_alternative_name': StringProperty(),
'issuer_alternative_name': StringProperty(),
'subject_directory_attributes': StringProperty(),
'crl_distribution_points': StringProperty(),
'inhibit_any_policy': StringProperty(),
'private_key_usage_period_not_before': TimestampProperty(),
'private_key_usage_period_not_after': TimestampProperty(),
'certificate_policies': StringProperty(),
'policy_mappings': StringProperty(),
}
class X509Certificate(_Observable):
_type = 'x509-certificate'
_properties = {
'type': TypeProperty(_type),
'is_self_signed': BooleanProperty(),
'hashes': HashesProperty(),
'version': StringProperty(),
'serial_number': StringProperty(),
'signature_algorithm': StringProperty(),
'issuer': StringProperty(),
'validity_not_before': TimestampProperty(),
'validity_not_after': TimestampProperty(),
'subject': StringProperty(),
'subject_public_key_algorithm': StringProperty(),
'subject_public_key_modulus': StringProperty(),
'subject_public_key_exponent': IntegerProperty(),
'x509_v3_extensions': EmbeddedObjectProperty(type=X509V3ExtenstionsType),
}

View File

@ -4,7 +4,7 @@ from .base import _STIXBase
from .properties import (IDProperty, ListProperty, Property, ReferenceProperty,
SelectorProperty, StringProperty, TimestampProperty,
TypeProperty)
from .utils import get_dict, NOW
from .utils import NOW, get_dict
class ExternalReference(_STIXBase):
@ -15,6 +15,10 @@ class ExternalReference(_STIXBase):
'external_id': StringProperty(),
}
def _check_object_constraints(self):
super(ExternalReference, self)._check_object_constraints()
self._check_at_least_one_property(["description", "external_id", "url"])
class KillChainPhase(_STIXBase):
_properties = {

View File

@ -1,3 +1,5 @@
import base64
import binascii
import collections
import datetime as dt
import inspect
@ -8,7 +10,9 @@ from dateutil import parser
import pytz
from six import text_type
from .base import _STIXBase
from .base import _Observable, _STIXBase
from .exceptions import DictionaryKeyError
from .utils import get_dict
class Property(object):
@ -41,7 +45,7 @@ class Property(object):
- provide a default value for this property.
- `default()` can return the special value `NOW` to use the current
time. This is useful when several timestamps in the same object need
to use the same default value, so calling now() for each field--
to use the same default value, so calling now() for each property--
likely several microseconds apart-- does not work.
Subclasses can instead provide a lambda function for `default` as a keyword
@ -49,7 +53,7 @@ class Property(object):
raise their own exceptions.
When instantiating Properties, `required` and `default` should not be used
together. `default` implies that the field is required in the specification
together. `default` implies that the property is required in the specification
so this function will be used to supply a value if none is provided.
`required` means that the user must provide this; it is required in the
specification and we can't or don't want to create a default value.
@ -100,6 +104,12 @@ class ListProperty(Property):
iter(value)
except TypeError:
raise ValueError("must be an iterable.")
try:
if isinstance(value, basestring):
value = [value]
except NameError:
if isinstance(value, str):
value = [value]
result = []
for item in value:
@ -112,10 +122,15 @@ class ListProperty(Property):
# TODO Should we raise an error here?
valid = item
if isinstance(valid, collections.Mapping):
result.append(self.contained(**valid))
if type(self.contained) is EmbeddedObjectProperty:
obj_type = self.contained.type
else:
result.append(self.contained(valid))
obj_type = self.contained
if isinstance(valid, collections.Mapping):
result.append(obj_type(**valid))
else:
result.append(obj_type(valid))
# STIX spec forbids empty lists
if len(result) < 1:
@ -135,6 +150,7 @@ class StringProperty(Property):
class TypeProperty(Property):
def __init__(self, type):
super(TypeProperty, self).__init__(fixed=type)
@ -167,6 +183,14 @@ class IntegerProperty(Property):
raise ValueError("must be an integer.")
class FloatProperty(Property):
def clean(self, value):
try:
return float(value)
except Exception:
raise ValueError("must be a float.")
class BooleanProperty(Property):
def clean(self, value):
@ -213,11 +237,106 @@ class TimestampProperty(Property):
return pytz.utc.localize(parsed)
class ObservableProperty(Property):
def clean(self, value):
try:
dictified = get_dict(value)
except ValueError:
raise ValueError("The observable property must contain a dictionary")
valid_refs = dict((k, v['type']) for (k, v) in dictified.items())
from .__init__ import parse_observable # avoid circular import
for key, obj in dictified.items():
parsed_obj = parse_observable(obj, valid_refs)
if not issubclass(type(parsed_obj), _Observable):
raise ValueError("Objects in an observable property must be "
"Cyber Observable Objects")
dictified[key] = parsed_obj
return dictified
class DictionaryProperty(Property):
def clean(self, value):
try:
dictified = get_dict(value)
except ValueError:
raise ValueError("The dictionary property must contain a dictionary")
for k in dictified.keys():
if len(k) < 3:
raise DictionaryKeyError(k, "shorter than 3 characters")
elif len(k) > 256:
raise DictionaryKeyError(k, "longer than 256 characters")
if not re.match('^[a-zA-Z0-9_-]+$', k):
raise DictionaryKeyError(k, "contains characters other than"
"lowercase a-z, uppercase A-Z, "
"numerals 0-9, hyphen (-), or "
"underscore (_)")
return dictified
HASHES_REGEX = {
"MD5": ("^[a-fA-F0-9]{32}$", "MD5"),
"MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"),
"RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"),
"SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"),
"SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"),
"SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"),
"SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"),
"SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"),
"SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"),
"SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"),
"SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"),
"SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"),
"SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"),
"WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"),
}
class HashesProperty(DictionaryProperty):
def clean(self, value):
clean_dict = super(HashesProperty, self).clean(value)
for k, v in clean_dict.items():
key = k.upper().replace('-', '')
if key in HASHES_REGEX:
vocab_key = HASHES_REGEX[key][1]
if not re.match(HASHES_REGEX[key][0], v):
raise ValueError("'%s' is not a valid %s hash" % (v, vocab_key))
if k != vocab_key:
clean_dict[vocab_key] = clean_dict[k]
del clean_dict[k]
return clean_dict
class BinaryProperty(Property):
def clean(self, value):
try:
base64.b64decode(value)
except (binascii.Error, TypeError):
raise ValueError("must contain a base64 encoded string")
return value
class HexProperty(Property):
def clean(self, value):
if not re.match('^([a-fA-F0-9]{2})+$', value):
raise ValueError("must contain an even number of hexadecimal characters")
return value
REF_REGEX = re.compile("^[a-z][a-z-]+[a-z]--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}"
"-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")
class ReferenceProperty(Property):
def __init__(self, required=False, type=None):
"""
references sometimes must be to a specific object type
@ -240,6 +359,7 @@ SELECTOR_REGEX = re.compile("^[a-z0-9_-]{3,250}(\\.(\\[\\d+\\]|[a-z0-9_-]{1,250}
class SelectorProperty(Property):
def __init__(self, type=None):
# ignore type
super(SelectorProperty, self).__init__()
@ -248,3 +368,71 @@ class SelectorProperty(Property):
if not SELECTOR_REGEX.match(value):
raise ValueError("must adhere to selector syntax.")
return value
class ObjectReferenceProperty(StringProperty):
def __init__(self, valid_types=None, **kwargs):
if valid_types and type(valid_types) is not list:
valid_types = [valid_types]
self.valid_types = valid_types
super(ObjectReferenceProperty, self).__init__(**kwargs)
class EmbeddedObjectProperty(Property):
def __init__(self, type, required=False):
self.type = type
super(EmbeddedObjectProperty, self).__init__(required, type=type)
def clean(self, value):
if type(value) is dict:
value = self.type(**value)
elif not isinstance(value, self.type):
raise ValueError("must be of type %s." % self.type.__name__)
return value
class EnumProperty(StringProperty):
def __init__(self, allowed, **kwargs):
if type(allowed) is not list:
allowed = list(allowed)
self.allowed = allowed
super(EnumProperty, self).__init__(**kwargs)
def clean(self, value):
value = super(EnumProperty, self).clean(value)
if value not in self.allowed:
raise ValueError("value '%s' is not valid for this enumeration." % value)
return self.string_type(value)
class ExtensionsProperty(DictionaryProperty):
def __init__(self, enclosing_type=None, required=False):
self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(required)
def clean(self, value):
try:
dictified = get_dict(value)
except ValueError:
raise ValueError("The extensions property must contain a dictionary")
from .__init__ import EXT_MAP # avoid circular import
if self.enclosing_type in EXT_MAP:
specific_type_map = EXT_MAP[self.enclosing_type]
for key, subvalue in dictified.items():
if key in specific_type_map:
cls = specific_type_map[key]
if type(subvalue) is dict:
dictified[key] = cls(**subvalue)
elif type(subvalue) is cls:
dictified[key] = subvalue
else:
raise ValueError("Cannot determine extension type.")
else:
raise ValueError("The key used in the extensions dictionary is not an extension type name")
else:
raise ValueError("The enclosing type has no extensions defined")
return dictified

View File

@ -3,9 +3,9 @@
from .base import _STIXBase
from .common import COMMON_PROPERTIES
from .other import KillChainPhase
from .properties import (IDProperty, IntegerProperty, ListProperty, Property,
ReferenceProperty, StringProperty, TimestampProperty,
TypeProperty)
from .properties import (IDProperty, IntegerProperty, ListProperty,
ObservableProperty, ReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
from .utils import NOW
@ -125,7 +125,7 @@ class ObservedData(_STIXBase):
'first_observed': TimestampProperty(required=True),
'last_observed': TimestampProperty(required=True),
'number_observed': IntegerProperty(required=True),
'objects': Property(),
'objects': ObservableProperty(),
})

View File

@ -5,8 +5,8 @@ import pytest
import stix2
from .constants import FAKE_TIME
from .constants import INDICATOR_KWARGS, MALWARE_KWARGS, RELATIONSHIP_KWARGS
from .constants import (FAKE_TIME, INDICATOR_KWARGS, MALWARE_KWARGS,
RELATIONSHIP_KWARGS)
# Inspired by: http://stackoverflow.com/a/24006251

View File

@ -2,10 +2,12 @@ import datetime as dt
import pytest
import pytz
import stix2
from .constants import ATTACK_PATTERN_ID
EXPECTED = """{
"created": "2016-05-12T08:17:27Z",
"description": "...",

View File

@ -2,6 +2,7 @@ import pytest
import stix2
EXPECTED_BUNDLE = """{
"id": "bundle--00000000-0000-0000-0000-000000000004",
"objects": [

View File

@ -2,10 +2,12 @@ import datetime as dt
import pytest
import pytz
import stix2
from .constants import CAMPAIGN_ID
EXPECTED = """{
"created": "2016-04-06T20:03:00Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",

View File

@ -2,10 +2,12 @@ import datetime as dt
import pytest
import pytz
import stix2
from .constants import COURSE_OF_ACTION_ID
EXPECTED = """{
"created": "2016-04-06T20:03:48Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",

View File

@ -3,8 +3,10 @@
import re
import pytest
import stix2
VERIS = """{
"external_id": "0001AA7F-C601-424A-B2B8-BE6C9F5164E7",
"source_name": "veris",
@ -112,4 +114,4 @@ def test_external_reference_source_required():
stix2.ExternalReference()
assert excinfo.value.cls == stix2.ExternalReference
assert excinfo.value.fields == ["source_name"]
assert excinfo.value.properties == ["source_name"]

View File

@ -2,10 +2,12 @@ import datetime as dt
import pytest
import pytz
import stix2
from .constants import IDENTITY_ID
EXPECTED = """{
"created": "2015-12-21T19:59:11Z",
"id": "identity--311b2d2d-f010-5473-83ec-1edf84858f4c",

View File

@ -3,10 +3,12 @@ import re
import pytest
import pytz
import stix2
from .constants import FAKE_TIME, INDICATOR_ID, INDICATOR_KWARGS
EXPECTED_INDICATOR = """{
"created": "2017-01-01T00:00:01Z",
"id": "indicator--01234567-89ab-cdef-0123-456789abcdef",
@ -30,7 +32,7 @@ EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join("""
""".split()) + ")"
def test_indicator_with_all_required_fields():
def test_indicator_with_all_required_properties():
now = dt.datetime(2017, 1, 1, 0, 0, 1, tzinfo=pytz.utc)
epoch = dt.datetime(1970, 1, 1, 0, 0, 1, tzinfo=pytz.utc)
@ -49,7 +51,7 @@ def test_indicator_with_all_required_fields():
assert rep == EXPECTED_INDICATOR_REPR
def test_indicator_autogenerated_fields(indicator):
def test_indicator_autogenerated_properties(indicator):
assert indicator.type == 'indicator'
assert indicator.id == 'indicator--00000000-0000-0000-0000-000000000001'
assert indicator.created == FAKE_TIME
@ -87,21 +89,21 @@ def test_indicator_id_must_start_with_indicator():
assert str(excinfo.value) == "Invalid value for Indicator 'id': must start with 'indicator--'."
def test_indicator_required_fields():
def test_indicator_required_properties():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Indicator()
assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.fields == ["labels", "pattern"]
assert str(excinfo.value) == "No values for required field(s) for Indicator: (labels, pattern)."
assert excinfo.value.properties == ["labels", "pattern"]
assert str(excinfo.value) == "No values for required properties for Indicator: (labels, pattern)."
def test_indicator_required_field_pattern():
def test_indicator_required_property_pattern():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Indicator(labels=['malicious-activity'])
assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.fields == ["pattern"]
assert excinfo.value.properties == ["pattern"]
def test_indicator_created_ref_invalid_format():
@ -135,8 +137,8 @@ def test_invalid_kwarg_to_indicator():
stix2.Indicator(my_custom_property="foo", **INDICATOR_KWARGS)
assert excinfo.value.cls == stix2.Indicator
assert excinfo.value.fields == ['my_custom_property']
assert str(excinfo.value) == "Unexpected field(s) for Indicator: (my_custom_property)."
assert excinfo.value.properties == ['my_custom_property']
assert str(excinfo.value) == "Unexpected properties for Indicator: (my_custom_property)."
def test_created_modified_time_are_identical_by_default():

View File

@ -2,10 +2,12 @@ import datetime as dt
import pytest
import pytz
import stix2
from .constants import INTRUSION_SET_ID
EXPECTED = """{
"aliases": [
"Zookeeper"

View File

@ -4,6 +4,7 @@ import pytest
import stix2
LMCO_RECON = """{
"kill_chain_name": "lockheed-martin-cyber-kill-chain",
"phase_name": "reconnaissance"
@ -34,28 +35,28 @@ def test_kill_chain_example():
assert str(preattack) == FOO_PRE_ATTACK
def test_kill_chain_required_fields():
def test_kill_chain_required_properties():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.KillChainPhase()
assert excinfo.value.cls == stix2.KillChainPhase
assert excinfo.value.fields == ["kill_chain_name", "phase_name"]
assert excinfo.value.properties == ["kill_chain_name", "phase_name"]
def test_kill_chain_required_field_chain_name():
def test_kill_chain_required_property_chain_name():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.KillChainPhase(phase_name="weaponization")
assert excinfo.value.cls == stix2.KillChainPhase
assert excinfo.value.fields == ["kill_chain_name"]
assert excinfo.value.properties == ["kill_chain_name"]
def test_kill_chain_required_field_phase_name():
def test_kill_chain_required_property_phase_name():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.KillChainPhase(kill_chain_name="lockheed-martin-cyber-kill-chain")
assert excinfo.value.cls == stix2.KillChainPhase
assert excinfo.value.fields == ["phase_name"]
assert excinfo.value.properties == ["phase_name"]

View File

@ -3,10 +3,12 @@ import re
import pytest
import pytz
import stix2
from .constants import FAKE_TIME, MALWARE_ID, MALWARE_KWARGS
EXPECTED_MALWARE = """{
"created": "2016-05-12T08:17:27Z",
"id": "malware--fedcba98-7654-3210-fedc-ba9876543210",
@ -19,7 +21,7 @@ EXPECTED_MALWARE = """{
}"""
def test_malware_with_all_required_fields():
def test_malware_with_all_required_properties():
now = dt.datetime(2016, 5, 12, 8, 17, 27, tzinfo=pytz.utc)
mal = stix2.Malware(
@ -34,7 +36,7 @@ def test_malware_with_all_required_fields():
assert str(mal) == EXPECTED_MALWARE
def test_malware_autogenerated_fields(malware):
def test_malware_autogenerated_properties(malware):
assert malware.type == 'malware'
assert malware.id == 'malware--00000000-0000-0000-0000-000000000001'
assert malware.created == FAKE_TIME
@ -70,20 +72,20 @@ def test_malware_id_must_start_with_malware():
assert str(excinfo.value) == "Invalid value for Malware 'id': must start with 'malware--'."
def test_malware_required_fields():
def test_malware_required_properties():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Malware()
assert excinfo.value.cls == stix2.Malware
assert excinfo.value.fields == ["labels", "name"]
assert excinfo.value.properties == ["labels", "name"]
def test_malware_required_field_name():
def test_malware_required_property_name():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Malware(labels=['ransomware'])
assert excinfo.value.cls == stix2.Malware
assert excinfo.value.fields == ["name"]
assert excinfo.value.properties == ["name"]
def test_cannot_assign_to_malware_attributes(malware):
@ -98,8 +100,8 @@ def test_invalid_kwarg_to_malware():
stix2.Malware(my_custom_property="foo", **MALWARE_KWARGS)
assert excinfo.value.cls == stix2.Malware
assert excinfo.value.fields == ['my_custom_property']
assert str(excinfo.value) == "Unexpected field(s) for Malware: (my_custom_property)."
assert excinfo.value.properties == ['my_custom_property']
assert str(excinfo.value) == "Unexpected properties for Malware: (my_custom_property)."
@pytest.mark.parametrize("data", [

View File

@ -2,11 +2,13 @@ import datetime as dt
import pytest
import pytz
import stix2
from stix2.other import TLP_WHITE
from .constants import MARKING_DEFINITION_ID
EXPECTED_TLP_MARKING_DEFINITION = """{
"created": "2017-01-20T00:00:00Z",
"definition": {

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,15 @@
import pytest
from stix2.properties import (BooleanProperty, IDProperty, IntegerProperty,
from stix2 import TCPExt
from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError
from stix2.observables import EmailMIMEComponent
from stix2.properties import (BinaryProperty, BooleanProperty,
DictionaryProperty, EmbeddedObjectProperty,
EnumProperty, ExtensionsProperty, HashesProperty,
HexProperty, IDProperty, IntegerProperty,
ListProperty, Property, ReferenceProperty,
StringProperty, TimestampProperty, TypeProperty)
from .constants import FAKE_TIME
@ -171,3 +178,118 @@ def test_timestamp_property_invalid():
ts_prop.clean(1)
with pytest.raises(ValueError):
ts_prop.clean("someday sometime")
def test_binary_property():
bin_prop = BinaryProperty()
assert bin_prop.clean("TG9yZW0gSXBzdW0=")
with pytest.raises(ValueError):
bin_prop.clean("foobar")
def test_hex_property():
hex_prop = HexProperty()
assert hex_prop.clean("4c6f72656d20497073756d")
with pytest.raises(ValueError):
hex_prop.clean("foobar")
@pytest.mark.parametrize("d", [
{'description': 'something'},
[('abc', 1), ('bcd', 2), ('cde', 3)],
])
def test_dictionary_property_valid(d):
dict_prop = DictionaryProperty()
assert dict_prop.clean(d)
@pytest.mark.parametrize("d", [
{'a': 'something'},
{'a'*300: 'something'},
{'Hey!': 'something'},
])
def test_dictionary_property_invalid(d):
dict_prop = DictionaryProperty()
with pytest.raises(DictionaryKeyError):
dict_prop.clean(d)
@pytest.mark.parametrize("value", [
{"sha256": "6db12788c37247f2316052e142f42f4b259d6561751e5f401a1ae2a6df9c674b"},
[('MD5', '2dfb1bcc980200c6706feee399d41b3f'), ('RIPEMD-160', 'b3a8cd8a27c90af79b3c81754f267780f443dfef')],
])
def test_hashes_property_valid(value):
hash_prop = HashesProperty()
assert hash_prop.clean(value)
@pytest.mark.parametrize("value", [
{"MD5": "a"},
{"SHA-256": "2dfb1bcc980200c6706feee399d41b3f"},
])
def test_hashes_property_invalid(value):
hash_prop = HashesProperty()
with pytest.raises(ValueError):
hash_prop.clean(value)
def test_embedded_property():
emb_prop = EmbeddedObjectProperty(type=EmailMIMEComponent)
mime = EmailMIMEComponent(
content_type="text/plain; charset=utf-8",
content_disposition="inline",
body="Cats are funny!"
)
assert emb_prop.clean(mime)
with pytest.raises(ValueError):
emb_prop.clean("string")
def test_enum_property():
enum_prop = EnumProperty(['a', 'b', 'c'])
assert enum_prop.clean('b')
with pytest.raises(ValueError):
enum_prop.clean('z')
def test_extension_property_valid():
ext_prop = ExtensionsProperty(enclosing_type='file')
assert ext_prop({
'windows-pebinary-ext': {
'pe_type': 'exe'
},
})
@pytest.mark.parametrize("data", [
1,
{'foobar-ext': {
'pe_type': 'exe'
}},
])
def test_extension_property_invalid(data):
ext_prop = ExtensionsProperty(enclosing_type='file')
with pytest.raises(ValueError):
ext_prop.clean(data)
def test_extension_property_invalid_type():
ext_prop = ExtensionsProperty(enclosing_type='indicator')
with pytest.raises(ValueError) as excinfo:
ext_prop.clean({
'windows-pebinary-ext': {
'pe_type': 'exe'
}}
)
assert 'no extensions defined' in str(excinfo.value)
def test_extension_at_least_one_property_constraint():
with pytest.raises(AtLeastOnePropertyError):
TCPExt()

View File

@ -5,8 +5,8 @@ import pytz
import stix2
from .constants import FAKE_TIME, INDICATOR_ID, MALWARE_ID, RELATIONSHIP_ID
from .constants import RELATIONSHIP_KWARGS
from .constants import (FAKE_TIME, INDICATOR_ID, MALWARE_ID, RELATIONSHIP_ID,
RELATIONSHIP_KWARGS)
EXPECTED_RELATIONSHIP = """{
@ -20,7 +20,7 @@ EXPECTED_RELATIONSHIP = """{
}"""
def test_relationship_all_required_fields():
def test_relationship_all_required_properties():
now = dt.datetime(2016, 4, 6, 20, 6, 37, tzinfo=pytz.utc)
rel = stix2.Relationship(
@ -35,7 +35,7 @@ def test_relationship_all_required_fields():
assert str(rel) == EXPECTED_RELATIONSHIP
def test_relationship_autogenerated_fields(relationship):
def test_relationship_autogenerated_properties(relationship):
assert relationship.type == 'relationship'
assert relationship.id == 'relationship--00000000-0000-0000-0000-000000000001'
assert relationship.created == FAKE_TIME
@ -73,22 +73,22 @@ def test_relationship_id_must_start_with_relationship():
assert str(excinfo.value) == "Invalid value for Relationship 'id': must start with 'relationship--'."
def test_relationship_required_field_relationship_type():
def test_relationship_required_property_relationship_type():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Relationship()
assert excinfo.value.cls == stix2.Relationship
assert excinfo.value.fields == ["relationship_type", "source_ref", "target_ref"]
assert excinfo.value.properties == ["relationship_type", "source_ref", "target_ref"]
def test_relationship_missing_some_required_fields():
def test_relationship_missing_some_required_properties():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Relationship(relationship_type='indicates')
assert excinfo.value.cls == stix2.Relationship
assert excinfo.value.fields == ["source_ref", "target_ref"]
assert excinfo.value.properties == ["source_ref", "target_ref"]
def test_relationship_required_field_target_ref():
def test_relationship_required_properties_target_ref():
with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo:
stix2.Relationship(
relationship_type='indicates',
@ -96,7 +96,7 @@ def test_relationship_required_field_target_ref():
)
assert excinfo.value.cls == stix2.Relationship
assert excinfo.value.fields == ["target_ref"]
assert excinfo.value.properties == ["target_ref"]
def test_cannot_assign_to_relationship_attributes(relationship):
@ -111,8 +111,8 @@ def test_invalid_kwarg_to_relationship():
stix2.Relationship(my_custom_property="foo", **RELATIONSHIP_KWARGS)
assert excinfo.value.cls == stix2.Relationship
assert excinfo.value.fields == ['my_custom_property']
assert str(excinfo.value) == "Unexpected field(s) for Relationship: (my_custom_property)."
assert excinfo.value.properties == ['my_custom_property']
assert str(excinfo.value) == "Unexpected properties for Relationship: (my_custom_property)."
def test_create_relationship_from_objects_rather_than_ids(indicator, malware):

View File

@ -2,10 +2,12 @@ import datetime as dt
import pytest
import pytz
import stix2
from .constants import INDICATOR_KWARGS, REPORT_ID
EXPECTED = """{
"created": "2015-12-21T19:59:11Z",
"created_by_ref": "identity--a463ffb3-1bd9-4d94-b02d-74e4f1658283",

View File

@ -31,7 +31,7 @@ BAD_SIGHTING = """{
}"""
def test_sighting_all_required_fields():
def test_sighting_all_required_properties():
now = dt.datetime(2016, 4, 6, 20, 6, 37, tzinfo=pytz.utc)
s = stix2.Sighting(
@ -79,8 +79,8 @@ def test_invalid_kwarg_to_sighting():
stix2.Sighting(my_custom_property="foo", **SIGHTING_KWARGS)
assert excinfo.value.cls == stix2.Sighting
assert excinfo.value.fields == ['my_custom_property']
assert str(excinfo.value) == "Unexpected field(s) for Sighting: (my_custom_property)."
assert excinfo.value.properties == ['my_custom_property']
assert str(excinfo.value) == "Unexpected properties for Sighting: (my_custom_property)."
def test_create_sighting_from_objects_rather_than_ids(malware): # noqa: F811

View File

@ -2,10 +2,12 @@ import datetime as dt
import pytest
import pytz
import stix2
from .constants import THREAT_ACTOR_ID
EXPECTED = """{
"created": "2016-04-06T20:03:48Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",

View File

@ -2,10 +2,12 @@ import datetime as dt
import pytest
import pytz
import stix2
from .constants import TOOL_ID
EXPECTED = """{
"created": "2016-04-06T20:03:48Z",
"created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",

View File

@ -1,4 +1,5 @@
import datetime as dt
from io import StringIO
import pytest
import pytz
@ -17,3 +18,24 @@ eastern = pytz.timezone('US/Eastern')
])
def test_timestamp_formatting(dttm, timestamp):
assert stix2.utils.format_datetime(dttm) == timestamp
@pytest.mark.parametrize('data', [
{"a": 1},
'{"a": 1}',
StringIO(u'{"a": 1}'),
[("a", 1,)],
])
def test_get_dict(data):
assert stix2.utils.get_dict(data)
@pytest.mark.parametrize('data', [
1,
[1],
['a', 1],
"foobar",
])
def test_get_dict_invalid(data):
with pytest.raises(ValueError):
stix2.utils.get_dict(data)

View File

@ -1,4 +1,5 @@
import pytest
import stix2
@ -141,7 +142,7 @@ def test_versioning_error_usetting_required_property():
campaign_v1.new_version(name=None)
assert excinfo.value.cls == stix2.Campaign
assert excinfo.value.fields == ["name"]
assert excinfo.value.properties == ["name"]
def test_versioning_error_new_version_of_revoked():

View File

@ -2,10 +2,12 @@ import datetime as dt
import pytest
import pytz
import stix2
from .constants import VULNERABILITY_ID
EXPECTED = """{
"created": "2016-05-12T08:17:27Z",
"external_references": [

View File

@ -63,11 +63,17 @@ def get_dict(data):
"""
if type(data) is dict:
obj = data
return data
else:
try:
obj = json.loads(data)
return json.loads(data)
except TypeError:
obj = json.load(data)
return obj
pass
try:
return json.load(data)
except AttributeError:
pass
try:
return dict(data)
except (ValueError, TypeError):
raise ValueError("Cannot convert '%s' to dictionary." % str(data))

View File

@ -1,5 +1,5 @@
[tox]
envlist = py26,py27,py33,py34,py35,py36,pycodestyle
envlist = py26,py27,py33,py34,py35,py36,pycodestyle,isort-check
[testenv]
deps =
@ -18,11 +18,10 @@ passenv = CI TRAVIS TRAVIS_*
[testenv:pycodestyle]
deps =
flake8
flake8-import-order
pycodestyle
commands =
pycodestyle ./stix2
flake8 --max-line-length=160 --import-order-style='google'
flake8 --max-line-length=160
[pycodestyle]
ignore=
@ -31,6 +30,10 @@ max-line-length=160
[flake8]
max-line-length=160
[testenv:isort-check]
deps = isort
commands = isort -rc stix2 -c -df
[travis]
python =
2.6: py26