diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 0000000..56f50d1 --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,7 @@ +[settings] +check=1 +diff=1 +known_third_party=dateutil,pytest,pytz,six +known_first_party=stix2 +not_skip=__init__.py +force_sort_within_sections=1 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 309e53b..3dba8ef 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,3 +6,8 @@ args: - --max-line-length=160 - id: check-merge-conflict + +- repo: https://github.com/FalconSocial/pre-commit-python-sorter + sha: 1.0.4 + hooks: + - id: python-import-sorter diff --git a/stix2/__init__.py b/stix2/__init__.py index 187d18a..f3dfd20 100644 --- a/stix2/__init__.py +++ b/stix2/__init__.py @@ -2,16 +2,26 @@ # flake8: noqa +from . import exceptions from .bundle import Bundle -from .other import ExternalReference, KillChainPhase, MarkingDefinition, \ - GranularMarking, StatementMarking, TLPMarking -from .sdo import AttackPattern, Campaign, CourseOfAction, Identity, Indicator, \ - IntrusionSet, Malware, ObservedData, Report, ThreatActor, Tool, \ - Vulnerability +from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact, + AutonomousSystem, Directory, DomainName, + EmailAddress, EmailMessage, EmailMIMEComponent, File, + HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address, + MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt, + Process, RasterImageExt, SocketExt, Software, TCPExt, + UNIXAccountExt, UserAccount, WindowsPEBinaryExt, + WindowsPEOptionalHeaderType, WindowsPESection, + WindowsProcessExt, WindowsRegistryKey, + WindowsRegistryValueType, WindowsServiceExt, + X509Certificate, X509V3ExtenstionsType) +from .other import (ExternalReference, GranularMarking, KillChainPhase, + MarkingDefinition, StatementMarking, TLPMarking) +from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator, + IntrusionSet, Malware, ObservedData, Report, ThreatActor, + Tool, Vulnerability) from .sro import Relationship, Sighting from .utils import get_dict -from . import exceptions - OBJ_MAP = { 'attack-pattern': AttackPattern, @@ -31,6 +41,59 @@ OBJ_MAP = { 'vulnerability': Vulnerability, } +OBJ_MAP_OBSERVABLE = { + 'artifact': Artifact, + 'autonomous-system': AutonomousSystem, + 'directory': Directory, + 'domain-name': DomainName, + 'email-address': EmailAddress, + 'email-message': EmailMessage, + 'file': File, + 'ipv4-addr': IPv4Address, + 'ipv6-addr': IPv6Address, + 'mac-addr': MACAddress, + 'mutex': Mutex, + 'network-traffic': NetworkTraffic, + 'process': Process, + 'software': Software, + 'url': URL, + 'user-account': UserAccount, + 'windows-registry-key': WindowsRegistryKey, + 'x509-certificate': X509Certificate, +} + +EXT_MAP_FILE = { + 'archive-ext': ArchiveExt, + 'ntfs-ext': NTFSExt, + 'pdf-ext': PDFExt, + 'raster-image-ext': RasterImageExt, + 'windows-pebinary-ext': WindowsPEBinaryExt +} + +EXT_MAP_NETWORK_TRAFFIC = { + 'http-request-ext': HTTPRequestExt, + 'icmp-ext': ICMPExt, + 'socket-ext': SocketExt, + 'tcp-ext': TCPExt, +} + +EXT_MAP_PROCESS = { + 'windows-process-ext': WindowsProcessExt, + 'windows-service-ext': WindowsServiceExt, +} + +EXT_MAP_USER_ACCOUNT = { + 'unix-account-ext': UNIXAccountExt, +} + +EXT_MAP = { + 'file': EXT_MAP_FILE, + 'network-traffic': EXT_MAP_NETWORK_TRAFFIC, + 'process': EXT_MAP_PROCESS, + 'user-account': EXT_MAP_USER_ACCOUNT, + +} + def parse(data): """Deserialize a string or file-like object into a STIX object""" @@ -43,9 +106,35 @@ def parse(data): else: try: obj_class = OBJ_MAP[obj['type']] - return obj_class(**obj) except KeyError: # TODO handle custom objects - raise ValueError("Can't parse unknown object type!") + raise ValueError("Can't parse unknown object type '%s'!" % obj['type']) + return obj_class(**obj) return obj + + +def parse_observable(data, _valid_refs): + """Deserialize a string or file-like object into a STIX Cyber Observable + object. + """ + + obj = get_dict(data) + obj['_valid_refs'] = _valid_refs + + if 'type' not in obj: + raise ValueError("'type' is a required property!") + try: + obj_class = OBJ_MAP_OBSERVABLE[obj['type']] + except KeyError: + # TODO handle custom observable objects + raise ValueError("Can't parse unknown object type '%s'!" % obj['type']) + + if 'extensions' in obj and obj['type'] in EXT_MAP: + for name, ext in obj['extensions'].items(): + if name not in EXT_MAP[obj['type']]: + raise ValueError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type'])) + ext_class = EXT_MAP[obj['type']][name] + obj['extensions'][name] = ext_class(**obj['extensions'][name]) + + return obj_class(**obj) diff --git a/stix2/base.py b/stix2/base.py index 1c90dab..fc64883 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -3,17 +3,16 @@ import collections import copy import datetime as dt - import json - -from .exceptions import ExtraFieldsError, ImmutableError, InvalidValueError, \ - MissingFieldsError, RevokeError, UnmodifiablePropertyError -from .utils import format_datetime, get_timestamp, NOW, parse_into_datetime +from .exceptions import (AtLeastOnePropertyError, DependentPropertiestError, ExtraFieldsError, ImmutableError, + InvalidObjRefError, InvalidValueError, MissingFieldsError, MutuallyExclusivePropertiesError, + RevokeError, UnmodifiablePropertyError) +from .utils import NOW, format_datetime, get_timestamp, parse_into_datetime __all__ = ['STIXJSONEncoder', '_STIXBase'] -DEFAULT_ERROR = "{type} must have {field}='{expected}'." +DEFAULT_ERROR = "{type} must have {property}='{expected}'." class STIXJSONEncoder(json.JSONEncoder): @@ -48,6 +47,42 @@ class _STIXBase(collections.Mapping): except ValueError as exc: raise InvalidValueError(self.__class__, prop_name, reason=str(exc)) + # interproperty constraint methods + + def _check_mutually_exclusive_properties(self, list_of_properties, at_least_one=True): + current_properties = self.properties_populated() + count = len(set(list_of_properties).intersection(current_properties)) + # at_least_one allows for xor to be checked + if count > 1 or (at_least_one and count == 0): + raise MutuallyExclusivePropertiesError(self.__class__, list_of_properties) + + def _check_at_least_one_property(self, list_of_properties=None): + if not list_of_properties: + list_of_properties = sorted(list(self.__class__._properties.keys())) + if "type" in list_of_properties: + list_of_properties.remove("type") + current_properties = self.properties_populated() + list_of_properties_populated = set(list_of_properties).intersection(current_properties) + if list_of_properties and (not list_of_properties_populated or list_of_properties_populated == set(["extensions"])): + raise AtLeastOnePropertyError(self.__class__, list_of_properties) + + def _check_properties_dependency(self, list_of_properties, list_of_dependent_properties, values=[]): + failed_dependency_pairs = [] + current_properties = self.properties_populated() + for p in list_of_properties: + v = values.pop() if values else None + for dp in list_of_dependent_properties: + if dp in current_properties and (p not in current_properties or (v and not current_properties(p) == v)): + failed_dependency_pairs.append((p, dp)) + if failed_dependency_pairs: + raise DependentPropertiestError(self.__class__, failed_dependency_pairs) + + def _check_object_constraints(self): + if self.granular_markings: + for m in self.granular_markings: + # TODO: check selectors + pass + def __init__(self, **kwargs): cls = self.__class__ @@ -65,9 +100,9 @@ class _STIXBase(collections.Mapping): if prop_value: setting_kwargs[prop_name] = prop_value - # Detect any missing required fields - required_fields = get_required_properties(cls._properties) - missing_kwargs = set(required_fields) - set(setting_kwargs) + # Detect any missing required properties + required_properties = get_required_properties(cls._properties) + missing_kwargs = set(required_properties) - set(setting_kwargs) if missing_kwargs: raise MissingFieldsError(cls, missing_kwargs) @@ -76,10 +111,7 @@ class _STIXBase(collections.Mapping): self._inner = setting_kwargs - if self.granular_markings: - for m in self.granular_markings: - # TODO: check selectors - pass + self._check_object_constraints() def __getitem__(self, key): return self._inner[key] @@ -115,6 +147,9 @@ class _STIXBase(collections.Mapping): cls = type(self) return cls(**new_inner) + def properties_populated(self): + return list(self._inner.keys()) + # Versioning API def new_version(self, **kwargs): @@ -142,3 +177,55 @@ class _STIXBase(collections.Mapping): if self.revoked: raise RevokeError("revoke") return self.new_version(revoked=True) + + +class _Observable(_STIXBase): + + def __init__(self, **kwargs): + # the constructor might be called independently of an observed data object + if '_valid_refs' in kwargs: + self._STIXBase__valid_refs = kwargs.pop('_valid_refs') + else: + self._STIXBase__valid_refs = [] + super(_Observable, self).__init__(**kwargs) + + def _check_ref(self, ref, prop, prop_name): + if ref not in self._STIXBase__valid_refs: + raise InvalidObjRefError(self.__class__, prop_name, "'%s' is not a valid object in local scope" % ref) + + try: + allowed_types = prop.contained.valid_types + except AttributeError: + try: + allowed_types = prop.valid_types + except AttributeError: + raise ValueError("'%s' is named like an object reference property but " + "is not an ObjectReferenceProperty or a ListProperty " + "containing ObjectReferenceProperty." % prop_name) + + if allowed_types: + try: + ref_type = self._STIXBase__valid_refs[ref] + except TypeError: + raise ValueError("'%s' must be created with _valid_refs as a dict, not a list." % self.__class__.__name__) + if ref_type not in allowed_types: + raise InvalidObjRefError(self.__class__, prop_name, "object reference '%s' is of an invalid type '%s'" % (ref, ref_type)) + + def _check_property(self, prop_name, prop, kwargs): + super(_Observable, self)._check_property(prop_name, prop, kwargs) + if prop_name not in kwargs: + return + + if prop_name.endswith('_ref'): + ref = kwargs[prop_name] + self._check_ref(ref, prop, prop_name) + elif prop_name.endswith('_refs'): + for ref in kwargs[prop_name]: + self._check_ref(ref, prop, prop_name) + + +class _Extension(_STIXBase): + + def _check_object_constraints(self): + super(_Extension, self)._check_object_constraints() + self._check_at_least_one_property() diff --git a/stix2/exceptions.py b/stix2/exceptions.py index 20e44a7..7c92dda 100644 --- a/stix2/exceptions.py +++ b/stix2/exceptions.py @@ -17,31 +17,31 @@ class InvalidValueError(STIXError, ValueError): class MissingFieldsError(STIXError, ValueError): - """Missing required field(s) when constructing STIX object.""" + """Missing one or more required properties when constructing STIX object.""" - def __init__(self, cls, fields): + def __init__(self, cls, properties): super(MissingFieldsError, self).__init__() self.cls = cls - self.fields = sorted(list(fields)) + self.properties = sorted(list(properties)) def __str__(self): - msg = "No values for required field(s) for {0}: ({1})." + msg = "No values for required properties for {0}: ({1})." return msg.format(self.cls.__name__, - ", ".join(x for x in self.fields)) + ", ".join(x for x in self.properties)) class ExtraFieldsError(STIXError, TypeError): - """Extra field(s) were provided when constructing STIX object.""" + """One or more extra properties were provided when constructing STIX object.""" - def __init__(self, cls, fields): + def __init__(self, cls, properties): super(ExtraFieldsError, self).__init__() self.cls = cls - self.fields = sorted(list(fields)) + self.properties = sorted(list(properties)) def __str__(self): - msg = "Unexpected field(s) for {0}: ({1})." + msg = "Unexpected properties for {0}: ({1})." return msg.format(self.cls.__name__, - ", ".join(x for x in self.fields)) + ", ".join(x for x in self.properties)) class ImmutableError(STIXError, ValueError): @@ -51,6 +51,33 @@ class ImmutableError(STIXError, ValueError): super(ImmutableError, self).__init__("Cannot modify properties after creation.") +class DictionaryKeyError(STIXError, ValueError): + """Dictionary key does not conform to the correct format.""" + + def __init__(self, key, reason): + super(DictionaryKeyError, self).__init__() + self.key = key + self.reason = reason + + def __str__(self): + msg = "Invalid dictionary key {0.key}: ({0.reason})." + return msg.format(self) + + +class InvalidObjRefError(STIXError, ValueError): + """A STIX Cyber Observable Object contains an invalid object reference.""" + + def __init__(self, cls, prop_name, reason): + super(InvalidObjRefError, self).__init__() + self.cls = cls + self.prop_name = prop_name + self.reason = reason + + def __str__(self): + msg = "Invalid object reference for '{0.cls.__name__}:{0.prop_name}': {0.reason}" + return msg.format(self) + + class UnmodifiablePropertyError(STIXError, ValueError): """Attempted to modify an unmodifiable property of object when creating a new version""" @@ -63,6 +90,48 @@ class UnmodifiablePropertyError(STIXError, ValueError): return msg.format(", ".join(self.unchangable_properties)) +class MutuallyExclusivePropertiesError(STIXError, TypeError): + """Violating interproperty mutually exclusive constraint of a STIX object type.""" + + def __init__(self, cls, properties): + super(MutuallyExclusivePropertiesError, self).__init__() + self.cls = cls + self.properties = sorted(list(properties)) + + def __str__(self): + msg = "The field(s) for {0}: ({1}) are mutually exclusive." + return msg.format(self.cls.__name__, + ", ".join(x for x in self.properties)) + + +class DependentPropertiestError(STIXError, TypeError): + """Violating interproperty dependency constraint of a STIX object type.""" + + def __init__(self, cls, dependencies): + super(DependentPropertiestError, self).__init__() + self.cls = cls + self.dependencies = dependencies + + def __str__(self): + msg = "The property dependencies for {0}: ({1}) are not met." + return msg.format(self.cls.__name__, + ", ".join(x for x in self.dependencies)) + + +class AtLeastOnePropertyError(STIXError, TypeError): + """Violating a constraint of a STIX object type that at least one of the given properties must be populated.""" + + def __init__(self, cls, properties): + super(AtLeastOnePropertyError, self).__init__() + self.cls = cls + self.properties = sorted(list(properties)) + + def __str__(self): + msg = "At least one of the field(s) for {0}: ({1}) must be populated." + return msg.format(self.cls.__name__, + ", ".join(x for x in self.properties)) + + class RevokeError(STIXError, ValueError): """Attempted to an operation on a revoked object""" diff --git a/stix2/observables.py b/stix2/observables.py new file mode 100644 index 0000000..ec936fd --- /dev/null +++ b/stix2/observables.py @@ -0,0 +1,586 @@ +"""STIX 2.0 Cyber Observable Objects + +Embedded observable object types, such as Email MIME Component, which is +embedded in Email Message objects, inherit from _STIXBase instead of Observable +and do not have a '_type' attribute. +""" + +from .base import _Extension, _Observable, _STIXBase +from .exceptions import AtLeastOnePropertyError +from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, + EmbeddedObjectProperty, EnumProperty, + ExtensionsProperty, FloatProperty, HashesProperty, + HexProperty, IntegerProperty, ListProperty, + ObjectReferenceProperty, StringProperty, + TimestampProperty, TypeProperty) + + +class Artifact(_Observable): + _type = 'artifact' + _properties = { + 'type': TypeProperty(_type), + 'mime_type': StringProperty(), + 'payload_bin': BinaryProperty(), + 'url': StringProperty(), + 'hashes': HashesProperty(), + } + + def _check_object_constraints(self): + super(Artifact, self)._check_object_constraints() + self._check_mutually_exclusive_properties(["payload_bin", "url"]) + self._check_properties_dependency(["hashes"], ["url"]) + + +class AutonomousSystem(_Observable): + _type = 'autonomous-system' + _properties = { + 'type': TypeProperty(_type), + 'number': IntegerProperty(), + 'name': StringProperty(), + 'rir': StringProperty(), + } + + +class Directory(_Observable): + _type = 'directory' + _properties = { + 'type': TypeProperty(_type), + 'path': StringProperty(required=True), + 'path_enc': StringProperty(), + # these are not the created/modified timestamps of the object itself + 'created': TimestampProperty(), + 'modified': TimestampProperty(), + 'accessed': TimestampProperty(), + 'contains_refs': ListProperty(ObjectReferenceProperty(valid_types=['file', 'directory'])), + } + + +class DomainName(_Observable): + _type = 'domain-name' + _properties = { + 'type': TypeProperty(_type), + 'value': StringProperty(required=True), + 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name'])), + } + + +class EmailAddress(_Observable): + _type = 'email-address' + _properties = { + 'type': TypeProperty(_type), + 'value': StringProperty(required=True), + 'display_name': StringProperty(), + 'belongs_to_ref': ObjectReferenceProperty(valid_types='user-account'), + } + + +class EmailMIMEComponent(_STIXBase): + _properties = { + 'body': StringProperty(), + 'body_raw_ref': ObjectReferenceProperty(valid_types=['artifact', 'file']), + 'content_type': StringProperty(), + 'content_disposition': StringProperty(), + } + + def _check_object_constraints(self): + super(EmailMIMEComponent, self)._check_object_constraints() + self._check_at_least_one_property(["body", "body_raw_ref"]) + + +class EmailMessage(_Observable): + _type = 'email-message' + _properties = { + 'type': TypeProperty(_type), + 'is_multipart': BooleanProperty(required=True), + 'date': TimestampProperty(), + 'content_type': StringProperty(), + 'from_ref': ObjectReferenceProperty(valid_types='email-addr'), + 'sender_ref': ObjectReferenceProperty(valid_types='email-addr'), + 'to_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')), + 'cc_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')), + 'bcc_refs': ListProperty(ObjectReferenceProperty(valid_types='email-addr')), + 'subject': StringProperty(), + 'received_lines': ListProperty(StringProperty), + 'additional_header_fields': DictionaryProperty(), + 'body': StringProperty(), + 'body_multipart': ListProperty(EmbeddedObjectProperty(type=EmailMIMEComponent)), + 'raw_email_ref': ObjectReferenceProperty(valid_types='artifact'), + } + + def _check_object_constraints(self): + super(EmailMessage, self)._check_object_constraints() + self._check_properties_dependency(["is_multipart"], ["body_multipart"]) + # self._dependency(["is_multipart"], ["body"], [False]) + + +class ArchiveExt(_Extension): + _properties = { + 'contains_refs': ListProperty(ObjectReferenceProperty(valid_types='file'), required=True), + 'version': StringProperty(), + 'comment': StringProperty(), + } + + +class AlternateDataStream(_STIXBase): + _properties = { + 'name': StringProperty(required=True), + 'hashes': HashesProperty(), + 'size': IntegerProperty(), + } + + +class NTFSExt(_Extension): + _properties = { + 'sid': StringProperty(), + 'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)), + } + + +class PDFExt(_Extension): + _properties = { + 'version': StringProperty(), + 'is_optimized': BooleanProperty(), + 'document_info_dict': DictionaryProperty(), + 'pdfid0': StringProperty(), + 'pdfid1': StringProperty(), + } + + +class RasterImageExt(_Extension): + _properties = { + 'image_height': IntegerProperty(), + 'image_weight': IntegerProperty(), + 'bits_per_pixel': IntegerProperty(), + 'image_compression_algorithm': StringProperty(), + 'exif_tags': DictionaryProperty(), + } + + +class WindowsPEOptionalHeaderType(_STIXBase): + _properties = { + 'magic_hex': HexProperty(), + 'major_linker_version': IntegerProperty(), + 'minor_linker_version': IntegerProperty(), + 'size_of_code': IntegerProperty(), + 'size_of_initialized_data': IntegerProperty(), + 'size_of_uninitialized_data': IntegerProperty(), + 'address_of_entry_point': IntegerProperty(), + 'base_of_code': IntegerProperty(), + 'base_of_data': IntegerProperty(), + 'image_base': IntegerProperty(), + 'section_alignment': IntegerProperty(), + 'file_alignment': IntegerProperty(), + 'major_os_version': IntegerProperty(), + 'minor_os_version': IntegerProperty(), + 'major_image_version': IntegerProperty(), + 'minor_image_version': IntegerProperty(), + 'major_subsystem_version': IntegerProperty(), + 'minor_subsystem_version': IntegerProperty(), + 'win32_version_value_hex': HexProperty(), + 'size_of_image': IntegerProperty(), + 'size_of_headers': IntegerProperty(), + 'checksum_hex': HexProperty(), + 'subsystem_hex': HexProperty(), + 'dll_characteristics_hex': HexProperty(), + 'size_of_stack_reserve': IntegerProperty(), + 'size_of_stack_commit': IntegerProperty(), + 'size_of_heap_reserve': IntegerProperty(), + 'size_of_heap_commit': IntegerProperty(), + 'loader_flags_hex': HexProperty(), + 'number_of_rva_and_sizes': IntegerProperty(), + 'hashes': HashesProperty(), + } + + def _check_object_constraints(self): + super(WindowsPEOptionalHeaderType, self)._check_object_constraints() + self._check_at_least_one_property() + + +class WindowsPESection(_STIXBase): + _properties = { + 'name': StringProperty(required=True), + 'size': IntegerProperty(), + 'entropy': FloatProperty(), + 'hashes': HashesProperty(), + } + + +class WindowsPEBinaryExt(_Extension): + _properties = { + 'pe_type': StringProperty(required=True), # open_vocab + 'imphash': StringProperty(), + 'machine_hex': HexProperty(), + 'number_of_sections': IntegerProperty(), + 'time_date_stamp': TimestampProperty(), + 'pointer_to_symbol_table_hex': HexProperty(), + 'number_of_symbols': IntegerProperty(), + 'size_of_optional_header': IntegerProperty(), + 'characteristics_hex': HexProperty(), + 'file_header_hashes': HashesProperty(), + 'optional_header': EmbeddedObjectProperty(type=WindowsPEOptionalHeaderType), + 'sections': ListProperty(EmbeddedObjectProperty(type=WindowsPESection)), + } + + +class File(_Observable): + _type = 'file' + _properties = { + 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), + 'hashes': HashesProperty(), + 'size': IntegerProperty(), + 'name': StringProperty(), + 'name_enc': StringProperty(), + 'magic_number_hex': HexProperty(), + 'mime_type': StringProperty(), + # these are not the created/modified timestamps of the object itself + 'created': TimestampProperty(), + 'modified': TimestampProperty(), + 'accessed': TimestampProperty(), + 'parent_directory_ref': ObjectReferenceProperty(valid_types='directory'), + 'is_encrypted': BooleanProperty(), + 'encryption_algorithm': StringProperty(), + 'decryption_key': StringProperty(), + 'contains_refs': ListProperty(ObjectReferenceProperty), + 'content_ref': ObjectReferenceProperty(valid_types='artifact'), + } + + def _check_object_constraints(self): + super(File, self)._check_object_constraints() + self._check_properties_dependency(["is_encrypted"], ["encryption_algorithm", "decryption_key"]) + self._check_at_least_one_property(["hashes", "name"]) + + +class IPv4Address(_Observable): + _type = 'ipv4-addr' + _properties = { + 'type': TypeProperty(_type), + 'value': StringProperty(required=True), + 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), + 'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), + } + + +class IPv6Address(_Observable): + _type = 'ipv6-addr' + _properties = { + 'type': TypeProperty(_type), + 'value': StringProperty(required=True), + 'resolves_to_refs': ListProperty(ObjectReferenceProperty(valid_types='mac-addr')), + 'belongs_to_refs': ListProperty(ObjectReferenceProperty(valid_types='autonomous-system')), + } + + +class MACAddress(_Observable): + _type = 'mac-addr' + _properties = { + 'type': TypeProperty(_type), + 'value': StringProperty(required=True), + } + + +class Mutex(_Observable): + _type = 'mutex' + _properties = { + 'type': TypeProperty(_type), + 'name': StringProperty(), + } + + +class HTTPRequestExt(_Extension): + _properties = { + 'request_method': StringProperty(required=True), + 'request_value': StringProperty(required=True), + 'request_version': StringProperty(), + 'request_header': DictionaryProperty(), + 'message_body_length': IntegerProperty(), + 'message_body_data_ref': ObjectReferenceProperty(valid_types='artifact'), + } + + +class ICMPExt(_Extension): + _properties = { + 'icmp_type_hex': HexProperty(required=True), + 'icmp_code_hex': HexProperty(required=True), + } + + +class SocketExt(_Extension): + _properties = { + 'address_family': EnumProperty([ + "AF_UNSPEC", + "AF_INET", + "AF_IPX", + "AF_APPLETALK", + "AF_NETBIOS", + "AF_INET6", + "AF_IRDA", + "AF_BTH", + ], required=True), + 'is_blocking': BooleanProperty(), + 'is_listening': BooleanProperty(), + 'protocol_family': EnumProperty([ + "PF_INET", + "PF_IPX", + "PF_APPLETALK", + "PF_INET6", + "PF_AX25", + "PF_NETROM" + ]), + 'options': DictionaryProperty(), + 'socket_type': EnumProperty([ + "SOCK_STREAM", + "SOCK_DGRAM", + "SOCK_RAW", + "SOCK_RDM", + "SOCK_SEQPACKET", + ]), + } + + +class TCPExt(_Extension): + _properties = { + 'src_flags_hex': HexProperty(), + 'dst_flags_hex': HexProperty(), + } + + +class NetworkTraffic(_Observable): + _type = 'network-traffic' + _properties = { + 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), + 'start': TimestampProperty(), + 'end': TimestampProperty(), + 'is_active': BooleanProperty(), + 'src_ref': ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name']), + 'dst_ref': ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'mac-addr', 'domain-name']), + 'src_port': IntegerProperty(), + 'dst_port': IntegerProperty(), + 'protocols': ListProperty(StringProperty, required=True), + 'src_byte_count': IntegerProperty(), + 'dst_byte_count': IntegerProperty(), + 'src_packets': IntegerProperty(), + 'dst_packets': IntegerProperty(), + 'ipfix': DictionaryProperty(), + 'src_payload_ref': ObjectReferenceProperty(valid_types='artifact'), + 'dst_payload_ref': ObjectReferenceProperty(valid_types='artifact'), + 'encapsulates_refs': ListProperty(ObjectReferenceProperty(valid_types='network-traffic')), + 'encapsulates_by_ref': ObjectReferenceProperty(valid_types='network-traffic'), + } + + def _check_object_constraints(self): + super(NetworkTraffic, self)._check_object_constraints() + self._check_at_least_one_property(["src_ref", "dst_ref"]) + + +class WindowsProcessExt(_Extension): + _properties = { + 'aslr_enabled': BooleanProperty(), + 'dep_enabled': BooleanProperty(), + 'priority': StringProperty(), + 'owner_sid': StringProperty(), + 'window_title': StringProperty(), + 'startup_info': DictionaryProperty(), + } + + +class WindowsServiceExt(_Extension): + _properties = { + 'service_name': StringProperty(required=True), + 'descriptions': ListProperty(StringProperty), + 'display_name': StringProperty(), + 'group_name': StringProperty(), + 'start_type': EnumProperty([ + "SERVICE_AUTO_START", + "SERVICE_BOOT_START", + "SERVICE_DEMAND_START", + "SERVICE_DISABLED", + "SERVICE_SYSTEM_ALERT", + ]), + 'service_dll_refs': ListProperty(ObjectReferenceProperty(valid_types='file')), + 'service_type': EnumProperty([ + "SERVICE_KERNEL_DRIVER", + "SERVICE_FILE_SYSTEM_DRIVER", + "SERVICE_WIN32_OWN_PROCESS", + "SERVICE_WIN32_SHARE_PROCESS", + ]), + 'service_status': EnumProperty([ + "SERVICE_CONTINUE_PENDING", + "SERVICE_PAUSE_PENDING", + "SERVICE_PAUSED", + "SERVICE_RUNNING", + "SERVICE_START_PENDING", + "SERVICE_STOP_PENDING", + "SERVICE_STOPPED", + ]), + } + + +class Process(_Observable): + _type = 'process' + _properties = { + 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), + 'is_hidden': BooleanProperty(), + 'pid': IntegerProperty(), + 'name': StringProperty(), + # this is not the created timestamps of the object itself + 'created': TimestampProperty(), + 'cwd': StringProperty(), + 'arguments': ListProperty(StringProperty), + 'command_line': StringProperty(), + 'environment_variables': DictionaryProperty(), + 'opened_connection_refs': ListProperty(ObjectReferenceProperty(valid_types='network-traffic')), + 'creator_user_ref': ObjectReferenceProperty(valid_types='user-account'), + 'binary_ref': ObjectReferenceProperty(valid_types='file'), + 'parent_ref': ObjectReferenceProperty(valid_types='process'), + 'child_refs': ListProperty(ObjectReferenceProperty('process')), + } + + def _check_object_constraints(self): + # no need to check windows-service-ext, since it has a required property + super(Process, self)._check_object_constraints() + try: + self._check_at_least_one_property() + if self.extensions and "windows-process-ext" in self.extensions: + self.extensions["windows-process-ext"]._check_at_least_one_property() + except AtLeastOnePropertyError as enclosing_exc: + if not self.extensions: + raise enclosing_exc + else: + if "windows-process-ext" in self.extensions: + self.extensions["windows-process-ext"]._check_at_least_one_property() + + +class Software(_Observable): + _type = 'software' + _properties = { + 'type': TypeProperty(_type), + 'name': StringProperty(required=True), + 'cpe': StringProperty(), + 'languages': ListProperty(StringProperty), + 'vendor': StringProperty(), + 'version': StringProperty(), + } + + +class URL(_Observable): + _type = 'url' + _properties = { + 'type': TypeProperty(_type), + 'value': StringProperty(required=True), + } + + +class UNIXAccountExt(_Extension): + _properties = { + 'gid': IntegerProperty(), + 'groups': ListProperty(StringProperty), + 'home_dir': StringProperty(), + 'shell': StringProperty(), + } + + +class UserAccount(_Observable): + _type = 'user-account' + _properties = { + 'type': TypeProperty(_type), + 'extensions': ExtensionsProperty(enclosing_type=_type), + 'user_id': StringProperty(required=True), + 'account_login': StringProperty(), + 'account_type': StringProperty(), # open vocab + 'display_name': StringProperty(), + 'is_service_account': BooleanProperty(), + 'is_privileged': BooleanProperty(), + 'can_escalate_privs': BooleanProperty(), + 'is_disabled': BooleanProperty(), + 'account_created': TimestampProperty(), + 'account_expires': TimestampProperty(), + 'password_last_changed': TimestampProperty(), + 'account_first_login': TimestampProperty(), + 'account_last_login': TimestampProperty(), + } + + +class WindowsRegistryValueType(_STIXBase): + _type = 'windows-registry-value-type' + _properties = { + 'name': StringProperty(required=True), + 'data': StringProperty(), + 'data_type': EnumProperty([ + 'REG_NONE', + 'REG_SZ', + 'REG_EXPAND_SZ', + 'REG_BINARY', + 'REG_DWORD', + 'REG_DWORD_BIG_ENDIAN', + 'REG_LINK', + 'REG_MULTI_SZ', + 'REG_RESOURCE_LIST', + 'REG_FULL_RESOURCE_DESCRIPTION', + 'REG_RESOURCE_REQUIREMENTS_LIST', + 'REG_QWORD', + 'REG_INVALID_TYPE', + ]), + } + + +class WindowsRegistryKey(_Observable): + _type = 'windows-registry-key' + _properties = { + 'type': TypeProperty(_type), + 'key': StringProperty(required=True), + 'values': ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType)), + # this is not the modified timestamps of the object itself + 'modified': TimestampProperty(), + 'creator_user_ref': ObjectReferenceProperty(valid_types='user-account'), + 'number_of_subkeys': IntegerProperty(), + } + + @property + def values(self): + # Needed because 'values' is a property on collections.Mapping objects + return self._inner['values'] + + +class X509V3ExtenstionsType(_STIXBase): + _type = 'x509-v3-extensions-type' + _properties = { + 'basic_constraints': StringProperty(), + 'name_constraints': StringProperty(), + 'policy_constraints': StringProperty(), + 'key_usage': StringProperty(), + 'extended_key_usage': StringProperty(), + 'subject_key_identifier': StringProperty(), + 'authority_key_identifier': StringProperty(), + 'subject_alternative_name': StringProperty(), + 'issuer_alternative_name': StringProperty(), + 'subject_directory_attributes': StringProperty(), + 'crl_distribution_points': StringProperty(), + 'inhibit_any_policy': StringProperty(), + 'private_key_usage_period_not_before': TimestampProperty(), + 'private_key_usage_period_not_after': TimestampProperty(), + 'certificate_policies': StringProperty(), + 'policy_mappings': StringProperty(), + } + + +class X509Certificate(_Observable): + _type = 'x509-certificate' + _properties = { + 'type': TypeProperty(_type), + 'is_self_signed': BooleanProperty(), + 'hashes': HashesProperty(), + 'version': StringProperty(), + 'serial_number': StringProperty(), + 'signature_algorithm': StringProperty(), + 'issuer': StringProperty(), + 'validity_not_before': TimestampProperty(), + 'validity_not_after': TimestampProperty(), + 'subject': StringProperty(), + 'subject_public_key_algorithm': StringProperty(), + 'subject_public_key_modulus': StringProperty(), + 'subject_public_key_exponent': IntegerProperty(), + 'x509_v3_extensions': EmbeddedObjectProperty(type=X509V3ExtenstionsType), + } diff --git a/stix2/other.py b/stix2/other.py index 9b7f03a..51663b3 100644 --- a/stix2/other.py +++ b/stix2/other.py @@ -4,7 +4,7 @@ from .base import _STIXBase from .properties import (IDProperty, ListProperty, Property, ReferenceProperty, SelectorProperty, StringProperty, TimestampProperty, TypeProperty) -from .utils import get_dict, NOW +from .utils import NOW, get_dict class ExternalReference(_STIXBase): @@ -15,6 +15,10 @@ class ExternalReference(_STIXBase): 'external_id': StringProperty(), } + def _check_object_constraints(self): + super(ExternalReference, self)._check_object_constraints() + self._check_at_least_one_property(["description", "external_id", "url"]) + class KillChainPhase(_STIXBase): _properties = { diff --git a/stix2/properties.py b/stix2/properties.py index 76fe31b..bd1e3a2 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -1,3 +1,5 @@ +import base64 +import binascii import collections import datetime as dt import inspect @@ -8,7 +10,9 @@ from dateutil import parser import pytz from six import text_type -from .base import _STIXBase +from .base import _Observable, _STIXBase +from .exceptions import DictionaryKeyError +from .utils import get_dict class Property(object): @@ -41,7 +45,7 @@ class Property(object): - provide a default value for this property. - `default()` can return the special value `NOW` to use the current time. This is useful when several timestamps in the same object need - to use the same default value, so calling now() for each field-- + to use the same default value, so calling now() for each property-- likely several microseconds apart-- does not work. Subclasses can instead provide a lambda function for `default` as a keyword @@ -49,7 +53,7 @@ class Property(object): raise their own exceptions. When instantiating Properties, `required` and `default` should not be used - together. `default` implies that the field is required in the specification + together. `default` implies that the property is required in the specification so this function will be used to supply a value if none is provided. `required` means that the user must provide this; it is required in the specification and we can't or don't want to create a default value. @@ -100,6 +104,12 @@ class ListProperty(Property): iter(value) except TypeError: raise ValueError("must be an iterable.") + try: + if isinstance(value, basestring): + value = [value] + except NameError: + if isinstance(value, str): + value = [value] result = [] for item in value: @@ -112,10 +122,15 @@ class ListProperty(Property): # TODO Should we raise an error here? valid = item - if isinstance(valid, collections.Mapping): - result.append(self.contained(**valid)) + if type(self.contained) is EmbeddedObjectProperty: + obj_type = self.contained.type else: - result.append(self.contained(valid)) + obj_type = self.contained + + if isinstance(valid, collections.Mapping): + result.append(obj_type(**valid)) + else: + result.append(obj_type(valid)) # STIX spec forbids empty lists if len(result) < 1: @@ -135,6 +150,7 @@ class StringProperty(Property): class TypeProperty(Property): + def __init__(self, type): super(TypeProperty, self).__init__(fixed=type) @@ -167,6 +183,14 @@ class IntegerProperty(Property): raise ValueError("must be an integer.") +class FloatProperty(Property): + def clean(self, value): + try: + return float(value) + except Exception: + raise ValueError("must be a float.") + + class BooleanProperty(Property): def clean(self, value): @@ -213,11 +237,106 @@ class TimestampProperty(Property): return pytz.utc.localize(parsed) +class ObservableProperty(Property): + + def clean(self, value): + try: + dictified = get_dict(value) + except ValueError: + raise ValueError("The observable property must contain a dictionary") + + valid_refs = dict((k, v['type']) for (k, v) in dictified.items()) + + from .__init__ import parse_observable # avoid circular import + for key, obj in dictified.items(): + parsed_obj = parse_observable(obj, valid_refs) + if not issubclass(type(parsed_obj), _Observable): + raise ValueError("Objects in an observable property must be " + "Cyber Observable Objects") + dictified[key] = parsed_obj + + return dictified + + +class DictionaryProperty(Property): + + def clean(self, value): + try: + dictified = get_dict(value) + except ValueError: + raise ValueError("The dictionary property must contain a dictionary") + + for k in dictified.keys(): + if len(k) < 3: + raise DictionaryKeyError(k, "shorter than 3 characters") + elif len(k) > 256: + raise DictionaryKeyError(k, "longer than 256 characters") + if not re.match('^[a-zA-Z0-9_-]+$', k): + raise DictionaryKeyError(k, "contains characters other than" + "lowercase a-z, uppercase A-Z, " + "numerals 0-9, hyphen (-), or " + "underscore (_)") + return dictified + + +HASHES_REGEX = { + "MD5": ("^[a-fA-F0-9]{32}$", "MD5"), + "MD6": ("^[a-fA-F0-9]{32}|[a-fA-F0-9]{40}|[a-fA-F0-9]{56}|[a-fA-F0-9]{64}|[a-fA-F0-9]{96}|[a-fA-F0-9]{128}$", "MD6"), + "RIPEMD160": ("^[a-fA-F0-9]{40}$", "RIPEMD-160"), + "SHA1": ("^[a-fA-F0-9]{40}$", "SHA-1"), + "SHA224": ("^[a-fA-F0-9]{56}$", "SHA-224"), + "SHA256": ("^[a-fA-F0-9]{64}$", "SHA-256"), + "SHA384": ("^[a-fA-F0-9]{96}$", "SHA-384"), + "SHA512": ("^[a-fA-F0-9]{128}$", "SHA-512"), + "SHA3224": ("^[a-fA-F0-9]{56}$", "SHA3-224"), + "SHA3256": ("^[a-fA-F0-9]{64}$", "SHA3-256"), + "SHA3384": ("^[a-fA-F0-9]{96}$", "SHA3-384"), + "SHA3512": ("^[a-fA-F0-9]{128}$", "SHA3-512"), + "SSDEEP": ("^[a-zA-Z0-9/+:.]{1,128}$", "ssdeep"), + "WHIRLPOOL": ("^[a-fA-F0-9]{128}$", "WHIRLPOOL"), +} + + +class HashesProperty(DictionaryProperty): + + def clean(self, value): + clean_dict = super(HashesProperty, self).clean(value) + for k, v in clean_dict.items(): + key = k.upper().replace('-', '') + if key in HASHES_REGEX: + vocab_key = HASHES_REGEX[key][1] + if not re.match(HASHES_REGEX[key][0], v): + raise ValueError("'%s' is not a valid %s hash" % (v, vocab_key)) + if k != vocab_key: + clean_dict[vocab_key] = clean_dict[k] + del clean_dict[k] + return clean_dict + + +class BinaryProperty(Property): + + def clean(self, value): + try: + base64.b64decode(value) + except (binascii.Error, TypeError): + raise ValueError("must contain a base64 encoded string") + return value + + +class HexProperty(Property): + + def clean(self, value): + if not re.match('^([a-fA-F0-9]{2})+$', value): + raise ValueError("must contain an even number of hexadecimal characters") + return value + + REF_REGEX = re.compile("^[a-z][a-z-]+[a-z]--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}" "-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$") class ReferenceProperty(Property): + def __init__(self, required=False, type=None): """ references sometimes must be to a specific object type @@ -240,6 +359,7 @@ SELECTOR_REGEX = re.compile("^[a-z0-9_-]{3,250}(\\.(\\[\\d+\\]|[a-z0-9_-]{1,250} class SelectorProperty(Property): + def __init__(self, type=None): # ignore type super(SelectorProperty, self).__init__() @@ -248,3 +368,71 @@ class SelectorProperty(Property): if not SELECTOR_REGEX.match(value): raise ValueError("must adhere to selector syntax.") return value + + +class ObjectReferenceProperty(StringProperty): + + def __init__(self, valid_types=None, **kwargs): + if valid_types and type(valid_types) is not list: + valid_types = [valid_types] + self.valid_types = valid_types + super(ObjectReferenceProperty, self).__init__(**kwargs) + + +class EmbeddedObjectProperty(Property): + + def __init__(self, type, required=False): + self.type = type + super(EmbeddedObjectProperty, self).__init__(required, type=type) + + def clean(self, value): + if type(value) is dict: + value = self.type(**value) + elif not isinstance(value, self.type): + raise ValueError("must be of type %s." % self.type.__name__) + return value + + +class EnumProperty(StringProperty): + + def __init__(self, allowed, **kwargs): + if type(allowed) is not list: + allowed = list(allowed) + self.allowed = allowed + super(EnumProperty, self).__init__(**kwargs) + + def clean(self, value): + value = super(EnumProperty, self).clean(value) + if value not in self.allowed: + raise ValueError("value '%s' is not valid for this enumeration." % value) + return self.string_type(value) + + +class ExtensionsProperty(DictionaryProperty): + def __init__(self, enclosing_type=None, required=False): + self.enclosing_type = enclosing_type + super(ExtensionsProperty, self).__init__(required) + + def clean(self, value): + try: + dictified = get_dict(value) + except ValueError: + raise ValueError("The extensions property must contain a dictionary") + + from .__init__ import EXT_MAP # avoid circular import + if self.enclosing_type in EXT_MAP: + specific_type_map = EXT_MAP[self.enclosing_type] + for key, subvalue in dictified.items(): + if key in specific_type_map: + cls = specific_type_map[key] + if type(subvalue) is dict: + dictified[key] = cls(**subvalue) + elif type(subvalue) is cls: + dictified[key] = subvalue + else: + raise ValueError("Cannot determine extension type.") + else: + raise ValueError("The key used in the extensions dictionary is not an extension type name") + else: + raise ValueError("The enclosing type has no extensions defined") + return dictified diff --git a/stix2/sdo.py b/stix2/sdo.py index a2f0062..73bb34d 100644 --- a/stix2/sdo.py +++ b/stix2/sdo.py @@ -3,9 +3,9 @@ from .base import _STIXBase from .common import COMMON_PROPERTIES from .other import KillChainPhase -from .properties import (IDProperty, IntegerProperty, ListProperty, Property, - ReferenceProperty, StringProperty, TimestampProperty, - TypeProperty) +from .properties import (IDProperty, IntegerProperty, ListProperty, + ObservableProperty, ReferenceProperty, StringProperty, + TimestampProperty, TypeProperty) from .utils import NOW @@ -125,7 +125,7 @@ class ObservedData(_STIXBase): 'first_observed': TimestampProperty(required=True), 'last_observed': TimestampProperty(required=True), 'number_observed': IntegerProperty(required=True), - 'objects': Property(), + 'objects': ObservableProperty(), }) diff --git a/stix2/test/conftest.py b/stix2/test/conftest.py index ec967b5..d1f3330 100644 --- a/stix2/test/conftest.py +++ b/stix2/test/conftest.py @@ -5,8 +5,8 @@ import pytest import stix2 -from .constants import FAKE_TIME -from .constants import INDICATOR_KWARGS, MALWARE_KWARGS, RELATIONSHIP_KWARGS +from .constants import (FAKE_TIME, INDICATOR_KWARGS, MALWARE_KWARGS, + RELATIONSHIP_KWARGS) # Inspired by: http://stackoverflow.com/a/24006251 diff --git a/stix2/test/test_attack_pattern.py b/stix2/test/test_attack_pattern.py index 618875e..7510888 100644 --- a/stix2/test/test_attack_pattern.py +++ b/stix2/test/test_attack_pattern.py @@ -2,10 +2,12 @@ import datetime as dt import pytest import pytz + import stix2 from .constants import ATTACK_PATTERN_ID + EXPECTED = """{ "created": "2016-05-12T08:17:27Z", "description": "...", diff --git a/stix2/test/test_bundle.py b/stix2/test/test_bundle.py index 6386a66..a849f3e 100644 --- a/stix2/test/test_bundle.py +++ b/stix2/test/test_bundle.py @@ -2,6 +2,7 @@ import pytest import stix2 + EXPECTED_BUNDLE = """{ "id": "bundle--00000000-0000-0000-0000-000000000004", "objects": [ diff --git a/stix2/test/test_campaign.py b/stix2/test/test_campaign.py index 7f6c4e6..9920019 100644 --- a/stix2/test/test_campaign.py +++ b/stix2/test/test_campaign.py @@ -2,10 +2,12 @@ import datetime as dt import pytest import pytz + import stix2 from .constants import CAMPAIGN_ID + EXPECTED = """{ "created": "2016-04-06T20:03:00Z", "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", diff --git a/stix2/test/test_course_of_action.py b/stix2/test/test_course_of_action.py index f566e10..263eae2 100644 --- a/stix2/test/test_course_of_action.py +++ b/stix2/test/test_course_of_action.py @@ -2,10 +2,12 @@ import datetime as dt import pytest import pytz + import stix2 from .constants import COURSE_OF_ACTION_ID + EXPECTED = """{ "created": "2016-04-06T20:03:48Z", "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", diff --git a/stix2/test/test_external_reference.py b/stix2/test/test_external_reference.py index 1e25fd2..61cfea3 100644 --- a/stix2/test/test_external_reference.py +++ b/stix2/test/test_external_reference.py @@ -3,8 +3,10 @@ import re import pytest + import stix2 + VERIS = """{ "external_id": "0001AA7F-C601-424A-B2B8-BE6C9F5164E7", "source_name": "veris", @@ -112,4 +114,4 @@ def test_external_reference_source_required(): stix2.ExternalReference() assert excinfo.value.cls == stix2.ExternalReference - assert excinfo.value.fields == ["source_name"] + assert excinfo.value.properties == ["source_name"] diff --git a/stix2/test/test_identity.py b/stix2/test/test_identity.py index 41c87bb..3952d25 100644 --- a/stix2/test/test_identity.py +++ b/stix2/test/test_identity.py @@ -2,10 +2,12 @@ import datetime as dt import pytest import pytz + import stix2 from .constants import IDENTITY_ID + EXPECTED = """{ "created": "2015-12-21T19:59:11Z", "id": "identity--311b2d2d-f010-5473-83ec-1edf84858f4c", diff --git a/stix2/test/test_indicator.py b/stix2/test/test_indicator.py index baf4b38..2afb473 100644 --- a/stix2/test/test_indicator.py +++ b/stix2/test/test_indicator.py @@ -3,10 +3,12 @@ import re import pytest import pytz + import stix2 from .constants import FAKE_TIME, INDICATOR_ID, INDICATOR_KWARGS + EXPECTED_INDICATOR = """{ "created": "2017-01-01T00:00:01Z", "id": "indicator--01234567-89ab-cdef-0123-456789abcdef", @@ -30,7 +32,7 @@ EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join(""" """.split()) + ")" -def test_indicator_with_all_required_fields(): +def test_indicator_with_all_required_properties(): now = dt.datetime(2017, 1, 1, 0, 0, 1, tzinfo=pytz.utc) epoch = dt.datetime(1970, 1, 1, 0, 0, 1, tzinfo=pytz.utc) @@ -49,7 +51,7 @@ def test_indicator_with_all_required_fields(): assert rep == EXPECTED_INDICATOR_REPR -def test_indicator_autogenerated_fields(indicator): +def test_indicator_autogenerated_properties(indicator): assert indicator.type == 'indicator' assert indicator.id == 'indicator--00000000-0000-0000-0000-000000000001' assert indicator.created == FAKE_TIME @@ -87,21 +89,21 @@ def test_indicator_id_must_start_with_indicator(): assert str(excinfo.value) == "Invalid value for Indicator 'id': must start with 'indicator--'." -def test_indicator_required_fields(): +def test_indicator_required_properties(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.Indicator() assert excinfo.value.cls == stix2.Indicator - assert excinfo.value.fields == ["labels", "pattern"] - assert str(excinfo.value) == "No values for required field(s) for Indicator: (labels, pattern)." + assert excinfo.value.properties == ["labels", "pattern"] + assert str(excinfo.value) == "No values for required properties for Indicator: (labels, pattern)." -def test_indicator_required_field_pattern(): +def test_indicator_required_property_pattern(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.Indicator(labels=['malicious-activity']) assert excinfo.value.cls == stix2.Indicator - assert excinfo.value.fields == ["pattern"] + assert excinfo.value.properties == ["pattern"] def test_indicator_created_ref_invalid_format(): @@ -135,8 +137,8 @@ def test_invalid_kwarg_to_indicator(): stix2.Indicator(my_custom_property="foo", **INDICATOR_KWARGS) assert excinfo.value.cls == stix2.Indicator - assert excinfo.value.fields == ['my_custom_property'] - assert str(excinfo.value) == "Unexpected field(s) for Indicator: (my_custom_property)." + assert excinfo.value.properties == ['my_custom_property'] + assert str(excinfo.value) == "Unexpected properties for Indicator: (my_custom_property)." def test_created_modified_time_are_identical_by_default(): diff --git a/stix2/test/test_intrusion_set.py b/stix2/test/test_intrusion_set.py index 19fb641..3241ced 100644 --- a/stix2/test/test_intrusion_set.py +++ b/stix2/test/test_intrusion_set.py @@ -2,10 +2,12 @@ import datetime as dt import pytest import pytz + import stix2 from .constants import INTRUSION_SET_ID + EXPECTED = """{ "aliases": [ "Zookeeper" diff --git a/stix2/test/test_kill_chain_phases.py b/stix2/test/test_kill_chain_phases.py index 037e930..2596095 100644 --- a/stix2/test/test_kill_chain_phases.py +++ b/stix2/test/test_kill_chain_phases.py @@ -4,6 +4,7 @@ import pytest import stix2 + LMCO_RECON = """{ "kill_chain_name": "lockheed-martin-cyber-kill-chain", "phase_name": "reconnaissance" @@ -34,28 +35,28 @@ def test_kill_chain_example(): assert str(preattack) == FOO_PRE_ATTACK -def test_kill_chain_required_fields(): +def test_kill_chain_required_properties(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.KillChainPhase() assert excinfo.value.cls == stix2.KillChainPhase - assert excinfo.value.fields == ["kill_chain_name", "phase_name"] + assert excinfo.value.properties == ["kill_chain_name", "phase_name"] -def test_kill_chain_required_field_chain_name(): +def test_kill_chain_required_property_chain_name(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.KillChainPhase(phase_name="weaponization") assert excinfo.value.cls == stix2.KillChainPhase - assert excinfo.value.fields == ["kill_chain_name"] + assert excinfo.value.properties == ["kill_chain_name"] -def test_kill_chain_required_field_phase_name(): +def test_kill_chain_required_property_phase_name(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.KillChainPhase(kill_chain_name="lockheed-martin-cyber-kill-chain") assert excinfo.value.cls == stix2.KillChainPhase - assert excinfo.value.fields == ["phase_name"] + assert excinfo.value.properties == ["phase_name"] diff --git a/stix2/test/test_malware.py b/stix2/test/test_malware.py index f3bfccd..23dc68e 100644 --- a/stix2/test/test_malware.py +++ b/stix2/test/test_malware.py @@ -3,10 +3,12 @@ import re import pytest import pytz + import stix2 from .constants import FAKE_TIME, MALWARE_ID, MALWARE_KWARGS + EXPECTED_MALWARE = """{ "created": "2016-05-12T08:17:27Z", "id": "malware--fedcba98-7654-3210-fedc-ba9876543210", @@ -19,7 +21,7 @@ EXPECTED_MALWARE = """{ }""" -def test_malware_with_all_required_fields(): +def test_malware_with_all_required_properties(): now = dt.datetime(2016, 5, 12, 8, 17, 27, tzinfo=pytz.utc) mal = stix2.Malware( @@ -34,7 +36,7 @@ def test_malware_with_all_required_fields(): assert str(mal) == EXPECTED_MALWARE -def test_malware_autogenerated_fields(malware): +def test_malware_autogenerated_properties(malware): assert malware.type == 'malware' assert malware.id == 'malware--00000000-0000-0000-0000-000000000001' assert malware.created == FAKE_TIME @@ -70,20 +72,20 @@ def test_malware_id_must_start_with_malware(): assert str(excinfo.value) == "Invalid value for Malware 'id': must start with 'malware--'." -def test_malware_required_fields(): +def test_malware_required_properties(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.Malware() assert excinfo.value.cls == stix2.Malware - assert excinfo.value.fields == ["labels", "name"] + assert excinfo.value.properties == ["labels", "name"] -def test_malware_required_field_name(): +def test_malware_required_property_name(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.Malware(labels=['ransomware']) assert excinfo.value.cls == stix2.Malware - assert excinfo.value.fields == ["name"] + assert excinfo.value.properties == ["name"] def test_cannot_assign_to_malware_attributes(malware): @@ -98,8 +100,8 @@ def test_invalid_kwarg_to_malware(): stix2.Malware(my_custom_property="foo", **MALWARE_KWARGS) assert excinfo.value.cls == stix2.Malware - assert excinfo.value.fields == ['my_custom_property'] - assert str(excinfo.value) == "Unexpected field(s) for Malware: (my_custom_property)." + assert excinfo.value.properties == ['my_custom_property'] + assert str(excinfo.value) == "Unexpected properties for Malware: (my_custom_property)." @pytest.mark.parametrize("data", [ diff --git a/stix2/test/test_markings.py b/stix2/test/test_markings.py index b3cc479..f1f07db 100644 --- a/stix2/test/test_markings.py +++ b/stix2/test/test_markings.py @@ -2,11 +2,13 @@ import datetime as dt import pytest import pytz + import stix2 from stix2.other import TLP_WHITE from .constants import MARKING_DEFINITION_ID + EXPECTED_TLP_MARKING_DEFINITION = """{ "created": "2017-01-20T00:00:00Z", "definition": { diff --git a/stix2/test/test_observed_data.py b/stix2/test/test_observed_data.py index 52dc15b..6a56963 100644 --- a/stix2/test/test_observed_data.py +++ b/stix2/test/test_observed_data.py @@ -1,11 +1,14 @@ import datetime as dt +import re import pytest import pytz + import stix2 from .constants import OBSERVED_DATA_ID + EXPECTED = """{ "created": "2016-04-06T19:58:16Z", "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", @@ -16,6 +19,7 @@ EXPECTED = """{ "number_observed": 50, "objects": { "0": { + "name": "foo.exe", "type": "file" } }, @@ -34,7 +38,8 @@ def test_observed_data_example(): number_observed=50, objects={ "0": { - "type": "file", + "name": "foo.exe", + "type": "file" }, }, ) @@ -42,6 +47,84 @@ def test_observed_data_example(): assert str(observed_data) == EXPECTED +EXPECTED_WITH_REF = """{ + "created": "2016-04-06T19:58:16Z", + "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + "first_observed": "2015-12-21T19:00:00Z", + "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + "last_observed": "2015-12-21T19:00:00Z", + "modified": "2016-04-06T19:58:16Z", + "number_observed": 50, + "objects": { + "0": { + "name": "foo.exe", + "type": "file" + }, + "1": { + "contains_refs": [ + "0" + ], + "path": "/usr/home", + "type": "directory" + } + }, + "type": "observed-data" +}""" + + +def test_observed_data_example_with_refs(): + observed_data = stix2.ObservedData( + id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T19:58:16Z", + modified="2016-04-06T19:58:16Z", + first_observed="2015-12-21T19:00:00Z", + last_observed="2015-12-21T19:00:00Z", + number_observed=50, + objects={ + "0": { + "name": "foo.exe", + "type": "file" + }, + "1": { + "type": "directory", + "path": "/usr/home", + "contains_refs": ["0"] + } + }, + ) + + assert str(observed_data) == EXPECTED_WITH_REF + + +def test_observed_data_example_with_bad_refs(): + with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: + stix2.ObservedData( + id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T19:58:16Z", + modified="2016-04-06T19:58:16Z", + first_observed="2015-12-21T19:00:00Z", + last_observed="2015-12-21T19:00:00Z", + number_observed=50, + objects={ + "0": { + "type": "file", + "name": "foo.exe" + }, + "1": { + "type": "directory", + "path": "/usr/home", + "contains_refs": ["2"] + } + }, + ) + + assert excinfo.value.cls == stix2.ObservedData + assert excinfo.value.prop_name == "objects" + assert excinfo.value.reason == "Invalid object reference for 'Directory:contains_refs': '2' is not a valid object in local scope" + + @pytest.mark.parametrize("data", [ EXPECTED, { @@ -55,6 +138,7 @@ def test_observed_data_example(): "number_observed": 50, "objects": { "0": { + "name": "foo.exe", "type": "file" } } @@ -70,6 +154,913 @@ def test_parse_observed_data(data): assert odata.first_observed == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc) assert odata.last_observed == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc) assert odata.created_by_ref == "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff" - # assert odata.objects["0"].type == "file" # TODO + assert odata.objects["0"].type == "file" -# TODO: Add other examples + +@pytest.mark.parametrize("data", [ + """"0": { + "type": "artifact", + "mime_type": "image/jpeg", + "payload_bin": "VBORw0KGgoAAAANSUhEUgAAADI==" + }""", + """"0": { + "type": "artifact", + "mime_type": "image/jpeg", + "url": "https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg", + "hashes": { + "MD5": "6826f9a05da08134006557758bb3afbb" + } + }""", +]) +def test_parse_artifact_valid(data): + odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED) + odata = stix2.parse(odata_str) + assert odata.objects["0"].type == "artifact" + + +@pytest.mark.parametrize("data", [ + """"0": { + "type": "artifact", + "mime_type": "image/jpeg", + "payload_bin": "abcVBORw0KGgoAAAANSUhEUgAAADI==" + }""", + """"0": { + "type": "artifact", + "mime_type": "image/jpeg", + "url": "https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg", + "hashes": { + "MD5": "a" + } + }""", +]) +def test_parse_artifact_invalid(data): + odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED) + with pytest.raises(ValueError): + stix2.parse(odata_str) + + +@pytest.mark.parametrize("data", [ + """"0": { + "type": "autonomous-system", + "number": 15139, + "name": "Slime Industries", + "rir": "ARIN" + }""", +]) +def test_parse_autonomous_system_valid(data): + odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED) + odata = stix2.parse(odata_str) + assert odata.objects["0"].type == "autonomous-system" + assert odata.objects["0"].number == 15139 + assert odata.objects["0"].name == "Slime Industries" + assert odata.objects["0"].rir == "ARIN" + + +@pytest.mark.parametrize("data", [ + """{ + "type": "email-address", + "value": "john@example.com", + "display_name": "John Doe", + "belongs_to_ref": "0" + }""", +]) +def test_parse_email_address(data): + odata = stix2.parse_observable(data, {"0": "user-account"}) + assert odata.type == "email-address" + + odata_str = re.compile('"belongs_to_ref": "0"', re.DOTALL).sub('"belongs_to_ref": "3"', data) + with pytest.raises(stix2.exceptions.InvalidObjRefError): + stix2.parse_observable(odata_str, {"0": "user-account"}) + + +@pytest.mark.parametrize("data", [ + """ + { + "type": "email-message", + "is_multipart": true, + "content_type": "multipart/mixed", + "date": "2016-06-19T14:20:40.000Z", + "from_ref": "1", + "to_refs": [ + "2" + ], + "cc_refs": [ + "3" + ], + "subject": "Check out this picture of a cat!", + "additional_header_fields": { + "Content-Disposition": "inline", + "X-Mailer": "Mutt/1.5.23", + "X-Originating-IP": "198.51.100.3" + }, + "body_multipart": [ + { + "content_type": "text/plain; charset=utf-8", + "content_disposition": "inline", + "body": "Cats are funny!" + }, + { + "content_type": "image/png", + "content_disposition": "attachment; filename=\\"tabby.png\\"", + "body_raw_ref": "4" + }, + { + "content_type": "application/zip", + "content_disposition": "attachment; filename=\\"tabby_pics.zip\\"", + "body_raw_ref": "5" + } + ] + } + """ +]) +def test_parse_email_message(data): + valid_refs = { + "0": "email-message", + "1": "email-addr", + "2": "email-addr", + "3": "email-addr", + "4": "artifact", + "5": "file", + } + odata = stix2.parse_observable(data, valid_refs) + assert odata.type == "email-message" + assert odata.body_multipart[0].content_disposition == "inline" + + +@pytest.mark.parametrize("data", [ + """"0": { + "type": "file", + "hashes": { + "SHA-256": "ceafbfd424be2ca4a5f0402cae090dda2fb0526cf521b60b60077c0f622b285a" + } + }, + "1": { + "type": "file", + "hashes": { + "SHA-256": "19c549ec2628b989382f6b280cbd7bb836a0b461332c0fe53511ce7d584b89d3" + } + }, + "2": { + "type": "file", + "hashes": { + "SHA-256": "0969de02ecf8a5f003e3f6d063d848c8a193aada092623f8ce408c15bcb5f038" + } + }, + "3": { + "type": "file", + "name": "foo.zip", + "hashes": { + "SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f" + }, + "mime_type": "application/zip", + "extensions": { + "archive-ext": { + "contains_refs": [ + "0", + "1", + "2" + ], + "version": "5.0" + } + } + }""", +]) +def test_parse_file_archive(data): + odata_str = re.compile('"objects".+\},', re.DOTALL).sub('"objects": { %s },' % data, EXPECTED) + odata = stix2.parse(odata_str) + assert odata.objects["3"].extensions['archive-ext'].version == "5.0" + + +@pytest.mark.parametrize("data", [ + """ + { + "type": "email-message", + "is_multipart": true, + "content_type": "multipart/mixed", + "date": "2016-06-19T14:20:40.000Z", + "from_ref": "1", + "to_refs": [ + "2" + ], + "cc_refs": [ + "3" + ], + "subject": "Check out this picture of a cat!", + "additional_header_fields": { + "Content-Disposition": "inline", + "X-Mailer": "Mutt/1.5.23", + "X-Originating-IP": "198.51.100.3" + }, + "body_multipart": [ + { + "content_type": "text/plain; charset=utf-8", + "content_disposition": "inline", + "body": "Cats are funny!" + }, + { + "content_type": "image/png", + "content_disposition": "attachment; filename=\\"tabby.png\\"" + }, + { + "content_type": "application/zip", + "content_disposition": "attachment; filename=\\"tabby_pics.zip\\"", + "body_raw_ref": "5" + } + ] + } + """ +]) +def test_parse_email_message_with_at_least_one_error(data): + valid_refs = { + "0": "email-message", + "1": "email-addr", + "2": "email-addr", + "3": "email-addr", + "4": "artifact", + "5": "file", + } + with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo: + stix2.parse_observable(data, valid_refs) + + assert excinfo.value.cls == stix2.EmailMIMEComponent + assert excinfo.value.properties == ["body", "body_raw_ref"] + + +@pytest.mark.parametrize("data", [ + """ + { + "type": "network-traffic", + "src_ref": "0", + "dst_ref": "1", + "protocols": [ + "tcp" + ] + } + """ +]) +def test_parse_basic_tcp_traffic(data): + odata = stix2.parse_observable(data, {"0": "ipv4-addr", "1": "ipv4-addr"}) + + assert odata.type == "network-traffic" + assert odata.src_ref == "0" + assert odata.dst_ref == "1" + assert odata.protocols == ["tcp"] + + +@pytest.mark.parametrize("data", [ + """ + { + "type": "network-traffic", + "src_port": 2487, + "dst_port": 1723, + "protocols": [ + "ipv4", + "pptp" + ], + "src_byte_count": 35779, + "dst_byte_count": 935750, + "encapsulates_refs": [ + "4" + ] + } + """ +]) +def test_parse_basic_tcp_traffic_with_error(data): + with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo: + stix2.parse_observable(data, {"4": "network-traffic"}) + + assert excinfo.value.cls == stix2.NetworkTraffic + assert excinfo.value.properties == ["dst_ref", "src_ref"] + + +EXPECTED_PROCESS_OD = """{ + "created": "2016-04-06T19:58:16Z", + "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + "first_observed": "2015-12-21T19:00:00Z", + "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + "last_observed": "2015-12-21T19:00:00Z", + "modified": "2016-04-06T19:58:16Z", + "number_observed": 50, + "objects": { + "0": { + "type": "file", + "hashes": { + "SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100fSHA" + }, + }, + "1": { + "type": "process", + "pid": 1221, + "name": "gedit-bin", + "created": "2016-01-20T14:11:25.55Z", + "arguments" :[ + "--new-window" + ], + "binary_ref": "0" + } + }, + "type": "observed-data" +}""" + + +def test_observed_data_with_process_example(): + observed_data = stix2.ObservedData( + id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", + created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", + created="2016-04-06T19:58:16Z", + modified="2016-04-06T19:58:16Z", + first_observed="2015-12-21T19:00:00Z", + last_observed="2015-12-21T19:00:00Z", + number_observed=50, + objects={ + "0": { + "type": "file", + "hashes": { + "SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f" + }, + }, + "1": { + "type": "process", + "pid": 1221, + "name": "gedit-bin", + "created": "2016-01-20T14:11:25.55Z", + "arguments": [ + "--new-window" + ], + "binary_ref": "0" + } + }) + + assert observed_data.objects["0"].type == "file" + assert observed_data.objects["0"].hashes["SHA-256"] == "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f" + assert observed_data.objects["1"].type == "process" + assert observed_data.objects["1"].pid == 1221 + assert observed_data.objects["1"].name == "gedit-bin" + assert observed_data.objects["1"].arguments[0] == "--new-window" + + +# creating cyber observables directly + +def test_artifact_example(): + art = stix2.Artifact(mime_type="image/jpeg", + url="https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg", + hashes={ + "MD5": "6826f9a05da08134006557758bb3afbb" + }) + assert art.mime_type == "image/jpeg" + assert art.url == "https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg" + assert art.hashes["MD5"] == "6826f9a05da08134006557758bb3afbb" + + +def test_artifact_mutual_exclusion_error(): + with pytest.raises(stix2.exceptions.MutuallyExclusivePropertiesError) as excinfo: + stix2.Artifact(mime_type="image/jpeg", + url="https://upload.wikimedia.org/wikipedia/commons/b/b4/JPEG_example_JPG_RIP_100.jpg", + hashes={ + "MD5": "6826f9a05da08134006557758bb3afbb" + }, + payload_bin="VBORw0KGgoAAAANSUhEUgAAADI==") + + assert excinfo.value.cls == stix2.Artifact + assert excinfo.value.properties == ["payload_bin", "url"] + + +def test_directory_example(): + dir = stix2.Directory(_valid_refs={"1": "file"}, + path='/usr/lib', + created="2015-12-21T19:00:00Z", + modified="2015-12-24T19:00:00Z", + accessed="2015-12-21T20:00:00Z", + contains_refs=["1"]) + + assert dir.path == '/usr/lib' + assert dir.created == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc) + assert dir.modified == dt.datetime(2015, 12, 24, 19, 0, 0, tzinfo=pytz.utc) + assert dir.accessed == dt.datetime(2015, 12, 21, 20, 0, 0, tzinfo=pytz.utc) + assert dir.contains_refs == ["1"] + + +def test_directory_example_ref_error(): + with pytest.raises(stix2.exceptions.InvalidObjRefError) as excinfo: + stix2.Directory(_valid_refs=[], + path='/usr/lib', + created="2015-12-21T19:00:00Z", + modified="2015-12-24T19:00:00Z", + accessed="2015-12-21T20:00:00Z", + contains_refs=["1"]) + + assert excinfo.value.cls == stix2.Directory + assert excinfo.value.prop_name == "contains_refs" + + +def test_domain_name_example(): + dn = stix2.DomainName(_valid_refs={"1": 'domain-name'}, + value="example.com", + resolves_to_refs=["1"]) + + assert dn.value == "example.com" + assert dn.resolves_to_refs == ["1"] + + +def test_domain_name_example_invalid_ref_type(): + with pytest.raises(stix2.exceptions.InvalidObjRefError) as excinfo: + stix2.DomainName(_valid_refs={"1": "file"}, + value="example.com", + resolves_to_refs=["1"]) + + assert excinfo.value.cls == stix2.DomainName + assert excinfo.value.prop_name == "resolves_to_refs" + + +def test_file_example(): + f = stix2.File(name="qwerty.dll", + hashes={ + "SHA-256": "ceafbfd424be2ca4a5f0402cae090dda2fb0526cf521b60b60077c0f622b285a"}, + size=100, + magic_number_hex="1C", + mime_type="application/msword", + created="2016-12-21T19:00:00Z", + modified="2016-12-24T19:00:00Z", + accessed="2016-12-21T20:00:00Z", + is_encrypted=True, + encryption_algorithm="AES128-CBC", + decryption_key="fred" + ) + + assert f.name == "qwerty.dll" + assert f.size == 100 + assert f.magic_number_hex == "1C" + assert f.hashes["SHA-256"] == "ceafbfd424be2ca4a5f0402cae090dda2fb0526cf521b60b60077c0f622b285a" + assert f.mime_type == "application/msword" + assert f.created == dt.datetime(2016, 12, 21, 19, 0, 0, tzinfo=pytz.utc) + assert f.modified == dt.datetime(2016, 12, 24, 19, 0, 0, tzinfo=pytz.utc) + assert f.accessed == dt.datetime(2016, 12, 21, 20, 0, 0, tzinfo=pytz.utc) + assert f.is_encrypted + assert f.encryption_algorithm == "AES128-CBC" + assert f.decryption_key == "fred" # does the key have a format we can test for? + + +def test_file_example_with_NTFSExt(): + f = stix2.File(name="abc.txt", + extensions={ + "ntfs-ext": { + "alternate_data_streams": [ + { + "name": "second.stream", + "size": 25536 + } + ] + } + }) + + assert f.name == "abc.txt" + assert f.extensions["ntfs-ext"].alternate_data_streams[0].size == 25536 + + +def test_file_example_with_empty_NTFSExt(): + with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo: + stix2.File(name="abc.txt", + extensions={ + "ntfs-ext": { + } + }) + + assert excinfo.value.cls == stix2.NTFSExt + assert excinfo.value.properties == sorted(list(stix2.NTFSExt._properties.keys())) + + +def test_file_example_with_PDFExt(): + f = stix2.File(name="qwerty.dll", + extensions={ + "pdf-ext": { + "version": "1.7", + "document_info_dict": { + "Title": "Sample document", + "Author": "Adobe Systems Incorporated", + "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh", + "Producer": "Acrobat Distiller 3.01 for Power Macintosh", + "CreationDate": "20070412090123-02" + }, + "pdfid0": "DFCE52BD827ECF765649852119D", + "pdfid1": "57A1E0F9ED2AE523E313C" + } + }) + + assert f.name == "qwerty.dll" + assert f.extensions["pdf-ext"].version == "1.7" + assert f.extensions["pdf-ext"].document_info_dict["Title"] == "Sample document" + + +def test_file_example_with_PDFExt_Object(): + f = stix2.File(name="qwerty.dll", + extensions={ + "pdf-ext": + stix2.PDFExt(version="1.7", + document_info_dict={ + "Title": "Sample document", + "Author": "Adobe Systems Incorporated", + "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh", + "Producer": "Acrobat Distiller 3.01 for Power Macintosh", + "CreationDate": "20070412090123-02" + }, + pdfid0="DFCE52BD827ECF765649852119D", + pdfid1="57A1E0F9ED2AE523E313C") + + }) + + assert f.name == "qwerty.dll" + assert f.extensions["pdf-ext"].version == "1.7" + assert f.extensions["pdf-ext"].document_info_dict["Title"] == "Sample document" + + +def test_file_example_with_RasterImageExt_Object(): + f = stix2.File(name="qwerty.jpeg", + extensions={ + "raster-image-ext": { + "bits_per_pixel": 123, + "exif_tags": { + "Make": "Nikon", + "Model": "D7000", + "XResolution": 4928, + "YResolution": 3264 + } + } + }) + assert f.name == "qwerty.jpeg" + assert f.extensions["raster-image-ext"].bits_per_pixel == 123 + assert f.extensions["raster-image-ext"].exif_tags["XResolution"] == 4928 + + +def test_file_example_with_WindowsPEBinaryExt(): + f = stix2.File(name="qwerty.dll", + extensions={ + "windows-pebinary-ext": { + "pe_type": "exe", + "machine_hex": "014c", + "number_of_sections": 4, + "time_date_stamp": "2016-01-22T12:31:12Z", + "pointer_to_symbol_table_hex": "74726144", + "number_of_symbols": 4542568, + "size_of_optional_header": 224, + "characteristics_hex": "818f", + "optional_header": { + "magic_hex": "010b", + "major_linker_version": 2, + "minor_linker_version": 25, + "size_of_code": 512, + "size_of_initialized_data": 283648, + "size_of_uninitialized_data": 0, + "address_of_entry_point": 4096, + "base_of_code": 4096, + "base_of_data": 8192, + "image_base": 14548992, + "section_alignment": 4096, + "file_alignment": 4096, + "major_os_version": 1, + "minor_os_version": 0, + "major_image_version": 0, + "minor_image_version": 0, + "major_subsystem_version": 4, + "minor_subsystem_version": 0, + "win32_version_value_hex": "00", + "size_of_image": 299008, + "size_of_headers": 4096, + "checksum_hex": "00", + "subsystem_hex": "03", + "dll_characteristics_hex": "00", + "size_of_stack_reserve": 100000, + "size_of_stack_commit": 8192, + "size_of_heap_reserve": 100000, + "size_of_heap_commit": 4096, + "loader_flags_hex": "abdbffde", + "number_of_rva_and_sizes": 3758087646 + }, + "sections": [ + { + "name": "CODE", + "entropy": 0.061089 + }, + { + "name": "DATA", + "entropy": 7.980693 + }, + { + "name": "NicolasB", + "entropy": 0.607433 + }, + { + "name": ".idata", + "entropy": 0.607433 + } + ] + } + + }) + assert f.name == "qwerty.dll" + assert f.extensions["windows-pebinary-ext"].sections[2].entropy == 0.607433 + + +def test_file_example_encryption_error(): + with pytest.raises(stix2.exceptions.DependentPropertiestError) as excinfo: + stix2.File(name="qwerty.dll", + is_encrypted=False, + encryption_algorithm="AES128-CBC" + ) + + assert excinfo.value.cls == stix2.File + assert excinfo.value.dependencies == [("is_encrypted", "encryption_algorithm")] + + +def test_ip4_address_example(): + ip4 = stix2.IPv4Address(_valid_refs={"4": "mac-addr", "5": "mac-addr"}, + value="198.51.100.3", + resolves_to_refs=["4", "5"]) + + assert ip4.value == "198.51.100.3" + assert ip4.resolves_to_refs == ["4", "5"] + + +def test_ip4_address_example_cidr(): + ip4 = stix2.IPv4Address(value="198.51.100.0/24") + + assert ip4.value == "198.51.100.0/24" + + +def test_ip6_address_example(): + ip6 = stix2.IPv6Address(value="2001:0db8:85a3:0000:0000:8a2e:0370:7334") + + assert ip6.value == "2001:0db8:85a3:0000:0000:8a2e:0370:7334" + + +def test_mac_address_example(): + ip6 = stix2.MACAddress(value="d2:fb:49:24:37:18") + + assert ip6.value == "d2:fb:49:24:37:18" + + +def test_network_traffic_example(): + nt = stix2.NetworkTraffic(_valid_refs={"0": "ipv4-addr", "1": "ipv4-addr"}, + protocols="tcp", + src_ref="0", + dst_ref="1") + assert nt.protocols == ["tcp"] + assert nt.src_ref == "0" + assert nt.dst_ref == "1" + + +def test_network_traffic_http_request_example(): + h = stix2.HTTPRequestExt(request_method="get", + request_value="/download.html", + request_version="http/1.1", + request_header={ + "Accept-Encoding": "gzip,deflate", + "User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113", + "Host": "www.example.com" + }) + nt = stix2.NetworkTraffic(_valid_refs={"0": "ipv4-addr"}, + protocols="tcp", + src_ref="0", + extensions={'http-request-ext': h}) + assert nt.extensions['http-request-ext'].request_method == "get" + assert nt.extensions['http-request-ext'].request_value == "/download.html" + assert nt.extensions['http-request-ext'].request_version == "http/1.1" + assert nt.extensions['http-request-ext'].request_header['Accept-Encoding'] == "gzip,deflate" + assert nt.extensions['http-request-ext'].request_header['User-Agent'] == "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113" + assert nt.extensions['http-request-ext'].request_header['Host'] == "www.example.com" + + +def test_network_traffic_icmp_example(): + h = stix2.ICMPExt(icmp_type_hex="08", + icmp_code_hex="00") + nt = stix2.NetworkTraffic(_valid_refs={"0": "ipv4-addr"}, + protocols="tcp", + src_ref="0", + extensions={'icmp-ext': h}) + assert nt.extensions['icmp-ext'].icmp_type_hex == "08" + assert nt.extensions['icmp-ext'].icmp_code_hex == "00" + + +def test_network_traffic_socket_example(): + h = stix2.SocketExt(is_listening=True, + address_family="AF_INET", + protocol_family="PF_INET", + socket_type="SOCK_STREAM") + nt = stix2.NetworkTraffic(_valid_refs={"0": "ipv4-addr"}, + protocols="tcp", + src_ref="0", + extensions={'socket-ext': h}) + assert nt.extensions['socket-ext'].is_listening + assert nt.extensions['socket-ext'].address_family == "AF_INET" + assert nt.extensions['socket-ext'].protocol_family == "PF_INET" + assert nt.extensions['socket-ext'].socket_type == "SOCK_STREAM" + + +def test_network_traffic_tcp_example(): + h = stix2.TCPExt(src_flags_hex="00000002") + nt = stix2.NetworkTraffic(_valid_refs={"0": "ipv4-addr"}, + protocols="tcp", + src_ref="0", + extensions={'tcp-ext': h}) + assert nt.extensions['tcp-ext'].src_flags_hex == "00000002" + + +def test_mutex_example(): + m = stix2.Mutex(name="barney") + + assert m.name == "barney" + + +def test_process_example(): + p = stix2.Process(_valid_refs={"0": "file"}, + pid=1221, + name="gedit-bin", + created="2016-01-20T14:11:25.55Z", + arguments=["--new-window"], + binary_ref="0") + + assert p.name == "gedit-bin" + assert p.arguments == ["--new-window"] + + +def test_process_example_empty_error(): + with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo: + stix2.Process() + + assert excinfo.value.cls == stix2.Process + properties_of_process = list(stix2.Process._properties.keys()) + properties_of_process.remove("type") + assert excinfo.value.properties == sorted(properties_of_process) + + +def test_process_example_empty_with_extensions(): + with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo: + stix2.Process(extensions={ + "windows-process-ext": {} + }) + + assert excinfo.value.cls == stix2.WindowsProcessExt + properties_of_extension = list(stix2.WindowsProcessExt._properties.keys()) + assert excinfo.value.properties == sorted(properties_of_extension) + + +def test_process_example_windows_process_ext_empty(): + with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo: + stix2.Process(pid=1221, + name="gedit-bin", + extensions={ + "windows-process-ext": {} + }) + + assert excinfo.value.cls == stix2.WindowsProcessExt + properties_of_extension = list(stix2.WindowsProcessExt._properties.keys()) + assert excinfo.value.properties == sorted(properties_of_extension) + + +def test_process_example_extensions_empty(): + with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo: + stix2.Process(extensions={ + }) + + assert excinfo.value.cls == stix2.Process + properties_of_process = list(stix2.Process._properties.keys()) + properties_of_process.remove("type") + assert excinfo.value.properties == sorted(properties_of_process) + + +def test_process_example_with_WindowsProcessExt_Object(): + p = stix2.Process(extensions={ + "windows-process-ext": stix2.WindowsProcessExt(aslr_enabled=True, + dep_enabled=True, + priority="HIGH_PRIORITY_CLASS", + owner_sid="S-1-5-21-186985262-1144665072-74031268-1309") # noqa + }) + + assert p.extensions["windows-process-ext"].dep_enabled + assert p.extensions["windows-process-ext"].owner_sid == "S-1-5-21-186985262-1144665072-74031268-1309" + + +def test_process_example_with_WindowsServiceExt(): + p = stix2.Process(extensions={ + "windows-service-ext": { + "service_name": "sirvizio", + "display_name": "Sirvizio", + "start_type": "SERVICE_AUTO_START", + "service_type": "SERVICE_WIN32_OWN_PROCESS", + "service_status": "SERVICE_RUNNING" + } + }) + + assert p.extensions["windows-service-ext"].service_name == "sirvizio" + assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS" + + +def test_process_example_with_WindowsProcessServiceExt(): + p = stix2.Process(extensions={ + "windows-service-ext": { + "service_name": "sirvizio", + "display_name": "Sirvizio", + "start_type": "SERVICE_AUTO_START", + "service_type": "SERVICE_WIN32_OWN_PROCESS", + "service_status": "SERVICE_RUNNING" + }, + "windows-process-ext": { + "aslr_enabled": True, + "dep_enabled": True, + "priority": "HIGH_PRIORITY_CLASS", + "owner_sid": "S-1-5-21-186985262-1144665072-74031268-1309" + } + }) + + assert p.extensions["windows-service-ext"].service_name == "sirvizio" + assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS" + assert p.extensions["windows-process-ext"].dep_enabled + assert p.extensions["windows-process-ext"].owner_sid == "S-1-5-21-186985262-1144665072-74031268-1309" + + +def test_software_example(): + s = stix2.Software(name="Word", + cpe="cpe:2.3:a:microsoft:word:2000:*:*:*:*:*:*:*", + version="2002", + vendor="Microsoft") + + assert s.name == "Word" + assert s.cpe == "cpe:2.3:a:microsoft:word:2000:*:*:*:*:*:*:*" + assert s.version == "2002" + assert s.vendor == "Microsoft" + + +def test_url_example(): + s = stix2.URL(value="https://example.com/research/index.html") + + assert s.type == "url" + assert s.value == "https://example.com/research/index.html" + + +def test_user_account_example(): + a = stix2.UserAccount(user_id="1001", + account_login="jdoe", + account_type="unix", + display_name="John Doe", + is_service_account=False, + is_privileged=False, + can_escalate_privs=True, + account_created="2016-01-20T12:31:12Z", + password_last_changed="2016-01-20T14:27:43Z", + account_first_login="2016-01-20T14:26:07Z", + account_last_login="2016-07-22T16:08:28Z") + + assert a.user_id == "1001" + assert a.account_login == "jdoe" + assert a.account_type == "unix" + assert a.display_name == "John Doe" + assert not a.is_service_account + assert not a.is_privileged + assert a.can_escalate_privs + assert a.account_created == dt.datetime(2016, 1, 20, 12, 31, 12, tzinfo=pytz.utc) + assert a.password_last_changed == dt.datetime(2016, 1, 20, 14, 27, 43, tzinfo=pytz.utc) + assert a.account_first_login == dt.datetime(2016, 1, 20, 14, 26, 7, tzinfo=pytz.utc) + assert a.account_last_login == dt.datetime(2016, 7, 22, 16, 8, 28, tzinfo=pytz.utc) + + +def test_user_account_unix_account_ext_example(): + u = stix2.UNIXAccountExt(gid=1001, + groups=["wheel"], + home_dir="/home/jdoe", + shell="/bin/bash") + a = stix2.UserAccount(user_id="1001", + account_login="jdoe", + account_type="unix", + extensions={'unix-account-ext': u}) + assert a.extensions['unix-account-ext'].gid == 1001 + assert a.extensions['unix-account-ext'].groups == ["wheel"] + assert a.extensions['unix-account-ext'].home_dir == "/home/jdoe" + assert a.extensions['unix-account-ext'].shell == "/bin/bash" + + +def test_windows_registry_key_example(): + with pytest.raises(ValueError): + v = stix2.WindowsRegistryValueType(name="Foo", + data="qwerty", + data_type="string") + + v = stix2.WindowsRegistryValueType(name="Foo", + data="qwerty", + data_type="REG_SZ") + w = stix2.WindowsRegistryKey(key="hkey_local_machine\\system\\bar\\foo", + values=[v]) + assert w.key == "hkey_local_machine\\system\\bar\\foo" + assert w.values[0].name == "Foo" + assert w.values[0].data == "qwerty" + assert w.values[0].data_type == "REG_SZ" + + +def test_x509_certificate_example(): + x509 = stix2.X509Certificate( + issuer="C=ZA, ST=Western Cape, L=Cape Town, O=Thawte Consulting cc, OU=Certification Services Division, CN=Thawte Server CA/emailAddress=server-certs@thawte.com", # noqa + validity_not_before="2016-03-12T12:00:00Z", + validity_not_after="2016-08-21T12:00:00Z", + subject="C=US, ST=Maryland, L=Pasadena, O=Brent Baccala, OU=FreeSoft, CN=www.freesoft.org/emailAddress=baccala@freesoft.org") # noqa + + assert x509.type == "x509-certificate" + assert x509.issuer == "C=ZA, ST=Western Cape, L=Cape Town, O=Thawte Consulting cc, OU=Certification Services Division, CN=Thawte Server CA/emailAddress=server-certs@thawte.com" # noqa + assert x509.subject == "C=US, ST=Maryland, L=Pasadena, O=Brent Baccala, OU=FreeSoft, CN=www.freesoft.org/emailAddress=baccala@freesoft.org" # noqa diff --git a/stix2/test/test_properties.py b/stix2/test/test_properties.py index e83b2fc..59e0dd9 100644 --- a/stix2/test/test_properties.py +++ b/stix2/test/test_properties.py @@ -1,8 +1,15 @@ import pytest -from stix2.properties import (BooleanProperty, IDProperty, IntegerProperty, +from stix2 import TCPExt +from stix2.exceptions import AtLeastOnePropertyError, DictionaryKeyError +from stix2.observables import EmailMIMEComponent +from stix2.properties import (BinaryProperty, BooleanProperty, + DictionaryProperty, EmbeddedObjectProperty, + EnumProperty, ExtensionsProperty, HashesProperty, + HexProperty, IDProperty, IntegerProperty, ListProperty, Property, ReferenceProperty, StringProperty, TimestampProperty, TypeProperty) + from .constants import FAKE_TIME @@ -171,3 +178,118 @@ def test_timestamp_property_invalid(): ts_prop.clean(1) with pytest.raises(ValueError): ts_prop.clean("someday sometime") + + +def test_binary_property(): + bin_prop = BinaryProperty() + + assert bin_prop.clean("TG9yZW0gSXBzdW0=") + with pytest.raises(ValueError): + bin_prop.clean("foobar") + + +def test_hex_property(): + hex_prop = HexProperty() + + assert hex_prop.clean("4c6f72656d20497073756d") + with pytest.raises(ValueError): + hex_prop.clean("foobar") + + +@pytest.mark.parametrize("d", [ + {'description': 'something'}, + [('abc', 1), ('bcd', 2), ('cde', 3)], +]) +def test_dictionary_property_valid(d): + dict_prop = DictionaryProperty() + assert dict_prop.clean(d) + + +@pytest.mark.parametrize("d", [ + {'a': 'something'}, + {'a'*300: 'something'}, + {'Hey!': 'something'}, +]) +def test_dictionary_property_invalid(d): + dict_prop = DictionaryProperty() + + with pytest.raises(DictionaryKeyError): + dict_prop.clean(d) + + +@pytest.mark.parametrize("value", [ + {"sha256": "6db12788c37247f2316052e142f42f4b259d6561751e5f401a1ae2a6df9c674b"}, + [('MD5', '2dfb1bcc980200c6706feee399d41b3f'), ('RIPEMD-160', 'b3a8cd8a27c90af79b3c81754f267780f443dfef')], +]) +def test_hashes_property_valid(value): + hash_prop = HashesProperty() + assert hash_prop.clean(value) + + +@pytest.mark.parametrize("value", [ + {"MD5": "a"}, + {"SHA-256": "2dfb1bcc980200c6706feee399d41b3f"}, +]) +def test_hashes_property_invalid(value): + hash_prop = HashesProperty() + + with pytest.raises(ValueError): + hash_prop.clean(value) + + +def test_embedded_property(): + emb_prop = EmbeddedObjectProperty(type=EmailMIMEComponent) + mime = EmailMIMEComponent( + content_type="text/plain; charset=utf-8", + content_disposition="inline", + body="Cats are funny!" + ) + assert emb_prop.clean(mime) + + with pytest.raises(ValueError): + emb_prop.clean("string") + + +def test_enum_property(): + enum_prop = EnumProperty(['a', 'b', 'c']) + assert enum_prop.clean('b') + + with pytest.raises(ValueError): + enum_prop.clean('z') + + +def test_extension_property_valid(): + ext_prop = ExtensionsProperty(enclosing_type='file') + assert ext_prop({ + 'windows-pebinary-ext': { + 'pe_type': 'exe' + }, + }) + + +@pytest.mark.parametrize("data", [ + 1, + {'foobar-ext': { + 'pe_type': 'exe' + }}, +]) +def test_extension_property_invalid(data): + ext_prop = ExtensionsProperty(enclosing_type='file') + with pytest.raises(ValueError): + ext_prop.clean(data) + + +def test_extension_property_invalid_type(): + ext_prop = ExtensionsProperty(enclosing_type='indicator') + with pytest.raises(ValueError) as excinfo: + ext_prop.clean({ + 'windows-pebinary-ext': { + 'pe_type': 'exe' + }} + ) + assert 'no extensions defined' in str(excinfo.value) + + +def test_extension_at_least_one_property_constraint(): + with pytest.raises(AtLeastOnePropertyError): + TCPExt() diff --git a/stix2/test/test_relationship.py b/stix2/test/test_relationship.py index 869107d..c4f9ebe 100644 --- a/stix2/test/test_relationship.py +++ b/stix2/test/test_relationship.py @@ -5,8 +5,8 @@ import pytz import stix2 -from .constants import FAKE_TIME, INDICATOR_ID, MALWARE_ID, RELATIONSHIP_ID -from .constants import RELATIONSHIP_KWARGS +from .constants import (FAKE_TIME, INDICATOR_ID, MALWARE_ID, RELATIONSHIP_ID, + RELATIONSHIP_KWARGS) EXPECTED_RELATIONSHIP = """{ @@ -20,7 +20,7 @@ EXPECTED_RELATIONSHIP = """{ }""" -def test_relationship_all_required_fields(): +def test_relationship_all_required_properties(): now = dt.datetime(2016, 4, 6, 20, 6, 37, tzinfo=pytz.utc) rel = stix2.Relationship( @@ -35,7 +35,7 @@ def test_relationship_all_required_fields(): assert str(rel) == EXPECTED_RELATIONSHIP -def test_relationship_autogenerated_fields(relationship): +def test_relationship_autogenerated_properties(relationship): assert relationship.type == 'relationship' assert relationship.id == 'relationship--00000000-0000-0000-0000-000000000001' assert relationship.created == FAKE_TIME @@ -73,22 +73,22 @@ def test_relationship_id_must_start_with_relationship(): assert str(excinfo.value) == "Invalid value for Relationship 'id': must start with 'relationship--'." -def test_relationship_required_field_relationship_type(): +def test_relationship_required_property_relationship_type(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.Relationship() assert excinfo.value.cls == stix2.Relationship - assert excinfo.value.fields == ["relationship_type", "source_ref", "target_ref"] + assert excinfo.value.properties == ["relationship_type", "source_ref", "target_ref"] -def test_relationship_missing_some_required_fields(): +def test_relationship_missing_some_required_properties(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.Relationship(relationship_type='indicates') assert excinfo.value.cls == stix2.Relationship - assert excinfo.value.fields == ["source_ref", "target_ref"] + assert excinfo.value.properties == ["source_ref", "target_ref"] -def test_relationship_required_field_target_ref(): +def test_relationship_required_properties_target_ref(): with pytest.raises(stix2.exceptions.MissingFieldsError) as excinfo: stix2.Relationship( relationship_type='indicates', @@ -96,7 +96,7 @@ def test_relationship_required_field_target_ref(): ) assert excinfo.value.cls == stix2.Relationship - assert excinfo.value.fields == ["target_ref"] + assert excinfo.value.properties == ["target_ref"] def test_cannot_assign_to_relationship_attributes(relationship): @@ -111,8 +111,8 @@ def test_invalid_kwarg_to_relationship(): stix2.Relationship(my_custom_property="foo", **RELATIONSHIP_KWARGS) assert excinfo.value.cls == stix2.Relationship - assert excinfo.value.fields == ['my_custom_property'] - assert str(excinfo.value) == "Unexpected field(s) for Relationship: (my_custom_property)." + assert excinfo.value.properties == ['my_custom_property'] + assert str(excinfo.value) == "Unexpected properties for Relationship: (my_custom_property)." def test_create_relationship_from_objects_rather_than_ids(indicator, malware): diff --git a/stix2/test/test_report.py b/stix2/test/test_report.py index 46a0a16..cec217a 100644 --- a/stix2/test/test_report.py +++ b/stix2/test/test_report.py @@ -2,10 +2,12 @@ import datetime as dt import pytest import pytz + import stix2 from .constants import INDICATOR_KWARGS, REPORT_ID + EXPECTED = """{ "created": "2015-12-21T19:59:11Z", "created_by_ref": "identity--a463ffb3-1bd9-4d94-b02d-74e4f1658283", diff --git a/stix2/test/test_sighting.py b/stix2/test/test_sighting.py index 9da6d4e..d8db35e 100644 --- a/stix2/test/test_sighting.py +++ b/stix2/test/test_sighting.py @@ -31,7 +31,7 @@ BAD_SIGHTING = """{ }""" -def test_sighting_all_required_fields(): +def test_sighting_all_required_properties(): now = dt.datetime(2016, 4, 6, 20, 6, 37, tzinfo=pytz.utc) s = stix2.Sighting( @@ -79,8 +79,8 @@ def test_invalid_kwarg_to_sighting(): stix2.Sighting(my_custom_property="foo", **SIGHTING_KWARGS) assert excinfo.value.cls == stix2.Sighting - assert excinfo.value.fields == ['my_custom_property'] - assert str(excinfo.value) == "Unexpected field(s) for Sighting: (my_custom_property)." + assert excinfo.value.properties == ['my_custom_property'] + assert str(excinfo.value) == "Unexpected properties for Sighting: (my_custom_property)." def test_create_sighting_from_objects_rather_than_ids(malware): # noqa: F811 diff --git a/stix2/test/test_threat_actor.py b/stix2/test/test_threat_actor.py index 5844775..93e8179 100644 --- a/stix2/test/test_threat_actor.py +++ b/stix2/test/test_threat_actor.py @@ -2,10 +2,12 @@ import datetime as dt import pytest import pytz + import stix2 from .constants import THREAT_ACTOR_ID + EXPECTED = """{ "created": "2016-04-06T20:03:48Z", "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", diff --git a/stix2/test/test_tool.py b/stix2/test/test_tool.py index 3193807..d7d3bef 100644 --- a/stix2/test/test_tool.py +++ b/stix2/test/test_tool.py @@ -2,10 +2,12 @@ import datetime as dt import pytest import pytz + import stix2 from .constants import TOOL_ID + EXPECTED = """{ "created": "2016-04-06T20:03:48Z", "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", diff --git a/stix2/test/test_utils.py b/stix2/test/test_utils.py index 3eee491..a70853c 100644 --- a/stix2/test/test_utils.py +++ b/stix2/test/test_utils.py @@ -1,4 +1,5 @@ import datetime as dt +from io import StringIO import pytest import pytz @@ -17,3 +18,24 @@ eastern = pytz.timezone('US/Eastern') ]) def test_timestamp_formatting(dttm, timestamp): assert stix2.utils.format_datetime(dttm) == timestamp + + +@pytest.mark.parametrize('data', [ + {"a": 1}, + '{"a": 1}', + StringIO(u'{"a": 1}'), + [("a", 1,)], +]) +def test_get_dict(data): + assert stix2.utils.get_dict(data) + + +@pytest.mark.parametrize('data', [ + 1, + [1], + ['a', 1], + "foobar", +]) +def test_get_dict_invalid(data): + with pytest.raises(ValueError): + stix2.utils.get_dict(data) diff --git a/stix2/test/test_versioning.py b/stix2/test/test_versioning.py index 4663263..fc9ae8a 100644 --- a/stix2/test/test_versioning.py +++ b/stix2/test/test_versioning.py @@ -1,4 +1,5 @@ import pytest + import stix2 @@ -141,7 +142,7 @@ def test_versioning_error_usetting_required_property(): campaign_v1.new_version(name=None) assert excinfo.value.cls == stix2.Campaign - assert excinfo.value.fields == ["name"] + assert excinfo.value.properties == ["name"] def test_versioning_error_new_version_of_revoked(): diff --git a/stix2/test/test_vulnerability.py b/stix2/test/test_vulnerability.py index 565f077..751460c 100644 --- a/stix2/test/test_vulnerability.py +++ b/stix2/test/test_vulnerability.py @@ -2,10 +2,12 @@ import datetime as dt import pytest import pytz + import stix2 from .constants import VULNERABILITY_ID + EXPECTED = """{ "created": "2016-05-12T08:17:27Z", "external_references": [ diff --git a/stix2/utils.py b/stix2/utils.py index a2493e2..ed12cdf 100644 --- a/stix2/utils.py +++ b/stix2/utils.py @@ -63,11 +63,17 @@ def get_dict(data): """ if type(data) is dict: - obj = data + return data else: try: - obj = json.loads(data) + return json.loads(data) except TypeError: - obj = json.load(data) - - return obj + pass + try: + return json.load(data) + except AttributeError: + pass + try: + return dict(data) + except (ValueError, TypeError): + raise ValueError("Cannot convert '%s' to dictionary." % str(data)) diff --git a/tox.ini b/tox.ini index 16e46e0..ff386d6 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py26,py27,py33,py34,py35,py36,pycodestyle +envlist = py26,py27,py33,py34,py35,py36,pycodestyle,isort-check [testenv] deps = @@ -18,11 +18,10 @@ passenv = CI TRAVIS TRAVIS_* [testenv:pycodestyle] deps = flake8 - flake8-import-order pycodestyle commands = pycodestyle ./stix2 - flake8 --max-line-length=160 --import-order-style='google' + flake8 --max-line-length=160 [pycodestyle] ignore= @@ -31,6 +30,10 @@ max-line-length=160 [flake8] max-line-length=160 +[testenv:isort-check] +deps = isort +commands = isort -rc stix2 -c -df + [travis] python = 2.6: py26