Merge pull request #11 from rpiazza/cyber-observables

Cyber observables
stix2.1
Chris Lenk 2017-05-15 15:29:59 -04:00 committed by GitHub
commit aa69c38444
6 changed files with 311 additions and 22 deletions

View File

@ -4,12 +4,17 @@
from . import exceptions
from .bundle import Bundle
from .observables import (URL, ArchiveExt, Artifact, AutonomousSystem,
Directory, DomainName, EmailAddress, EmailMessage,
EmailMIMEComponent, File, IPv4Address, IPv6Address,
MACAddress, Mutex, NetworkTraffic, Process, Software,
UserAccount, WindowsRegistryKey,
WindowsRegistryValueType, X509Certificate)
from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
AutonomousSystem, Directory, DomainName,
EmailAddress, EmailMessage, EmailMIMEComponent, File,
HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address,
MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt,
Process, RasterImageExt, SocketExt, Software, TCPExt,
UNIXAccountExt, UserAccount, WindowsPEBinaryExt,
WindowsPEOptionalHeaderType, WindowsPESection,
WindowsProcessExt, WindowsRegistryKey,
WindowsRegistryValueType, WindowsServiceExt,
X509Certificate, X509V3ExtenstionsType)
from .other import (ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking)
from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator,
@ -59,10 +64,34 @@ OBJ_MAP_OBSERVABLE = {
EXT_MAP_FILE = {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
}
EXT_MAP_NETWORK_TRAFFIC = {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
}
EXT_MAP_PROCESS = {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
}
EXT_MAP_USER_ACCOUNT = {
'unix-account-ext': UNIXAccountExt,
}
EXT_MAP = {
'file': EXT_MAP_FILE,
'network-traffic': EXT_MAP_NETWORK_TRAFFIC,
'process': EXT_MAP_PROCESS,
'user-account': EXT_MAP_USER_ACCOUNT,
}

View File

@ -50,21 +50,16 @@ class _STIXBase(collections.Mapping):
# interproperty constraint methods
def _check_mutually_exclusive_properties(self, list_of_properties, at_least_one=True):
count = 0
current_properties = self.properties_populated()
for x in list_of_properties:
if x in current_properties:
count += 1
count = len(set(list_of_properties).intersection(current_properties))
# at_least_one allows for xor to be checked
if count > 1 or (at_least_one and count == 0):
raise MutuallyExclusivePropertiesError(self.__class__, list_of_properties)
def _check_at_least_one_property(self, list_of_properties):
current_properties = self.properties_populated()
for x in list_of_properties:
if x in current_properties:
return
raise AtLeastOnePropertyError(self.__class__, list_of_properties)
if not set(list_of_properties).intersection(current_properties):
raise AtLeastOnePropertyError(self.__class__, list_of_properties)
def _check_properties_dependency(self, list_of_properties, list_of_dependent_properties, values=[]):
failed_dependency_pairs = []

View File

@ -7,8 +7,8 @@ and do not have a '_type' attribute.
from .base import _Observable, _STIXBase
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, HashesProperty,
HexProperty, IntegerProperty, ListProperty,
EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, FloatProperty,
HashesProperty, HexProperty, IntegerProperty, ListProperty,
ObjectReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
@ -119,11 +119,108 @@ class ArchiveExt(_STIXBase):
}
class AlternateDataStream(_STIXBase):
_properties = {
'name': StringProperty(required=True),
'hashes': HashesProperty(),
'size': IntegerProperty(),
}
class NTFSExt(_STIXBase):
_properties = {
'sid': StringProperty(),
'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)),
}
class PDFExt(_STIXBase):
_properties = {
'version': StringProperty(),
'is_optimized': BooleanProperty(),
'document_info_dict': DictionaryProperty(),
'pdfid0': StringProperty(),
'pdfid1': StringProperty(),
}
class RasterImageExt(_STIXBase):
_properties = {
'image_height': IntegerProperty(),
'image_weight': IntegerProperty(),
'bits_per_pixel': IntegerProperty(),
'image_compression_algorithm': StringProperty(),
'exif_tags': DictionaryProperty(),
}
class WindowsPEOptionalHeaderType(_STIXBase):
_properties = {
'magic_hex': HexProperty(),
'major_linker_version': IntegerProperty(),
'minor_linker_version': IntegerProperty(),
'size_of_code': IntegerProperty(),
'size_of_initialized_data': IntegerProperty(),
'size_of_uninitialized_data': IntegerProperty(),
'address_of_entry_point': IntegerProperty(),
'base_of_code': IntegerProperty(),
'base_of_data': IntegerProperty(),
'image_base': IntegerProperty(),
'section_alignment': IntegerProperty(),
'file_alignment': IntegerProperty(),
'major_os_version': IntegerProperty(),
'minor_os_version': IntegerProperty(),
'major_image_version': IntegerProperty(),
'minor_image_version': IntegerProperty(),
'major_subsystem_version': IntegerProperty(),
'minor_subsystem_version': IntegerProperty(),
'win32_version_value_hex': HexProperty(),
'size_of_image': IntegerProperty(),
'size_of_headers': IntegerProperty(),
'checksum_hex': HexProperty(),
'subsystem_hex': HexProperty(),
'dll_characteristics_hex': HexProperty(),
'size_of_stack_reserve': IntegerProperty(),
'size_of_stack_commit': IntegerProperty(),
'size_of_heap_reserve': IntegerProperty(),
'size_of_heap_commit': IntegerProperty(),
'loader_fkags_hex': HexProperty(),
'number_of_rva_and_sizes': IntegerProperty(),
'hashes': HashesProperty(),
}
class WindowsPESection(_STIXBase):
_properties = {
'name': StringProperty(required=True),
'size': IntegerProperty(),
'entropy': FloatProperty(),
'hashes': HashesProperty(),
}
class WindowsPEBinaryExt(_STIXBase):
_properties = {
'pe_type': StringProperty(required=True), # open_vocab
'imphash': StringProperty(),
'machine_hex': HexProperty(),
'number_of_sections': IntegerProperty(),
'time_date_stamp': TimestampProperty(),
'pointer_to_symbol_table_hex': HexProperty(),
'number_of_symbols': IntegerProperty(),
'size_of_optional_header': IntegerProperty(),
'characteristics_hex': HexProperty(),
'file_header_hashes': HashesProperty(),
'optional_header': EmbeddedObjectProperty(type=WindowsPEOptionalHeaderType),
'sections': ListProperty(EmbeddedObjectProperty(type=WindowsPESection)),
}
class File(_Observable):
_type = 'file'
_properties = {
'type': TypeProperty(_type),
'extensions': ExtensionsProperty(),
'extensions': ExtensionsProperty(enclosing_type=_type),
'hashes': HashesProperty(),
'size': IntegerProperty(),
'name': StringProperty(),
@ -184,11 +281,69 @@ class Mutex(_Observable):
}
class HTTPRequestExt(_STIXBase):
_properties = {
'request_method': StringProperty(required=True),
'request_value': StringProperty(required=True),
'request_version': StringProperty(),
'request_header': DictionaryProperty(),
'message_body_length': IntegerProperty(),
'message_body_data_ref': ObjectReferenceProperty(),
}
class ICMPExt(_STIXBase):
_properties = {
'icmp_type_hex': HexProperty(required=True),
'icmp_code_hex': HexProperty(required=True),
}
class SocketExt(_STIXBase):
_properties = {
'address_family': EnumProperty([
"AF_UNSPEC",
"AF_INET",
"AF_IPX",
"AF_APPLETALK",
"AF_NETBIOS",
"AF_INET6",
"AF_IRDA",
"AF_BTH",
], required=True),
'is_blocking': BooleanProperty(),
'is_listening': BooleanProperty(),
'protocol_family': EnumProperty([
"PF_INET",
"PF_IPX",
"PF_APPLETALK",
"PF_INET6",
"PF_AX25",
"PF_NETROM"
]),
'options': DictionaryProperty(),
'socket_type': EnumProperty([
"SOCK_STREAM",
"SOCK_DGRAM",
"SOCK_RAW",
"SOCK_RDM",
"SOCK_SEQPACKET",
]),
}
class TCPExt(_STIXBase):
_properties = {
'src_flags_hex': HexProperty(),
'dst_flags_hex': HexProperty(),
}
class NetworkTraffic(_Observable):
_type = 'network-traffic'
_properties = {
'type': TypeProperty(_type),
# extensions
'extensions': ExtensionsProperty(enclosing_type=_type),
'start': TimestampProperty(),
'end': TimestampProperty(),
'is_active': BooleanProperty(),
@ -213,11 +368,54 @@ class NetworkTraffic(_Observable):
self._check_at_least_one_property(["src_ref", "dst_ref"])
class WindowsProcessExt(_STIXBase):
_properties = {
'aslr_enabled': BooleanProperty(),
'dep_enabled': BooleanProperty(),
'priority': StringProperty(),
'owner_sid': StringProperty(),
'window_title': StringProperty(),
'startup_info': DictionaryProperty(),
}
class WindowsServiceExt(_STIXBase):
_properties = {
'service_name': StringProperty(required=True),
'descriptions': ListProperty(StringProperty),
'display_name': StringProperty(),
'group_name': StringProperty(),
'start_type': EnumProperty([
"SERVICE_AUTO_START",
"SERVICE_BOOT_START",
"SERVICE_DEMAND_START",
"SERVICE_DISABLED",
"SERVICE_SYSTEM_ALERT",
]),
'service_dll_refs': ListProperty(ObjectReferenceProperty),
'service_type': EnumProperty([
"SERVICE_KERNEL_DRIVER",
"SERVICE_FILE_SYSTEM_DRIVER",
"SERVICE_WIN32_OWN_PROCESS",
"SERVICE_WIN32_SHARE_PROCESS",
]),
'service_status': EnumProperty([
"SERVICE_CONTINUE_PENDING",
"SERVICE_PAUSE_PENDING",
"SERVICE_PAUSED",
"SERVICE_RUNNING",
"SERVICE_START_PENDING",
"SERVICE_STOP_PENDING",
"SERVICE_STOPPED",
]),
}
class Process(_Observable):
_type = 'process'
_properties = {
'type': TypeProperty(_type),
# extensions
'extensions': ExtensionsProperty(enclosing_type=_type),
'is_hidden': BooleanProperty(),
'pid': IntegerProperty(),
'name': StringProperty(),
@ -255,14 +453,23 @@ class URL(_Observable):
}
class UNIXAccountExt(_STIXBase):
_properties = {
'gid': IntegerProperty(),
'groups': ListProperty(StringProperty),
'home_dir': StringProperty(),
'shell': StringProperty(),
}
class UserAccount(_Observable):
_type = 'user-account'
_properties = {
'type': TypeProperty(_type),
# extensions
'extensions': ExtensionsProperty(enclosing_type=_type),
'user_id': StringProperty(required=True),
'account_login': StringProperty(),
'account_type': StringProperty(),
'account_type': StringProperty(), # open vocab
'display_name': StringProperty(),
'is_service_account': BooleanProperty(),
'is_privileged': BooleanProperty(),

View File

@ -15,6 +15,10 @@ class ExternalReference(_STIXBase):
'external_id': StringProperty(),
}
def _check_object_constaints(self):
super(ExternalReference, self)._check_object_constaints()
self._check_at_least_one_property(["description", "external_id", "url"])
class KillChainPhase(_STIXBase):
_properties = {

View File

@ -176,6 +176,14 @@ class IntegerProperty(Property):
raise ValueError("must be an integer.")
class FloatProperty(Property):
def clean(self, value):
try:
return float(value)
except Exception:
raise ValueError("must be an float.")
class BooleanProperty(Property):
def clean(self, value):
@ -379,4 +387,28 @@ class EnumProperty(StringProperty):
class ExtensionsProperty(DictionaryProperty):
pass
def __init__(self, enclosing_type=None, required=False):
self.enclosing_type = enclosing_type
super(ExtensionsProperty, self).__init__(required)
def clean(self, value):
if type(value) is dict:
from .__init__ import EXT_MAP # avoid circular import
if self.enclosing_type in EXT_MAP:
specific_type_map = EXT_MAP[self.enclosing_type]
for key, subvalue in value.items():
if key in specific_type_map:
cls = specific_type_map[key]
if type(subvalue) is dict:
value[key] = cls(**subvalue)
elif type(subvalue) is cls:
value[key] = subvalue
else:
raise ValueError("Cannot determine extension type.")
else:
raise ValueError("The key used in the extensions dictionary is not an extension type name")
else:
raise ValueError("The enclosing type has no extensions defined")
else:
raise ValueError("The extensions property must contain a dictionary")
return value

View File

@ -576,6 +576,28 @@ def test_file_example():
assert f.decryption_key == "fred" # does the key have a format we can test for?
def test_file_example_with_PDFExt():
f = stix2.File(name="qwerty.dll",
extensions={
"pdf-ext": {
"version": "1.7",
"document_info_dict": {
"Title": "Sample document",
"Author": "Adobe Systems Incorporated",
"Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh",
"Producer": "Acrobat Distiller 3.01 for Power Macintosh",
"CreationDate": "20070412090123-02"
},
"pdfid0": "DFCE52BD827ECF765649852119D",
"pdfid1": "57A1E0F9ED2AE523E313C"
}
})
assert f.name == "qwerty.dll"
assert f.extensions["pdf-ext"].version == "1.7"
assert f.extensions["pdf-ext"].document_info_dict["Title"] == "Sample document"
def test_file_example_encryption_error():
with pytest.raises(stix2.exceptions.DependentPropertiestError) as excinfo:
stix2.File(name="qwerty.dll",